diff --git a/dbgpt/app/knowledge/service.py b/dbgpt/app/knowledge/service.py index 0aecac6e7..f40001b5b 100644 --- a/dbgpt/app/knowledge/service.py +++ b/dbgpt/app/knowledge/service.py @@ -650,12 +650,12 @@ def query_graph(self, space_name, limit): { "id": node.vid, "communityId": node.get_prop("_community_id"), - "name": node.vid, - "type": "", + "name": node.name, + "type": node.get_prop("type") or "" } ) for edge in graph.edges(): res["edges"].append( - {"source": edge.sid, "target": edge.tid, "name": edge.name, "type": ""} + {"source": edge.sid, "target": edge.tid, "name": edge.name, "type": edge.get_prop("type") or ""} ) return res diff --git a/dbgpt/rag/transformer/graph_extractor.py b/dbgpt/rag/transformer/graph_extractor.py index 18e867683..092a1d193 100644 --- a/dbgpt/rag/transformer/graph_extractor.py +++ b/dbgpt/rag/transformer/graph_extractor.py @@ -65,7 +65,7 @@ def _parse_response(self, text: str, limit: Optional[int] = None) -> List[Graph] match = re.match(r"\((.*?)#(.*?)\)", line) if match: name, summary = [part.strip() for part in match.groups()] - graph.upsert_vertex(Vertex(name, description=summary)) + graph.upsert_vertex(Vertex(name, description=summary, vertex_type='entity')) elif current_section == "Relationships": match = re.match(r"\((.*?)#(.*?)#(.*?)#(.*?)\)", line) if match: @@ -74,7 +74,7 @@ def _parse_response(self, text: str, limit: Optional[int] = None) -> List[Graph] ] edge_count += 1 graph.append_edge( - Edge(source, target, name, description=summary) + Edge(source, target, name, description=summary, edge_type='relation') ) if limit and edge_count >= limit: diff --git a/dbgpt/storage/graph_store/base.py b/dbgpt/storage/graph_store/base.py index 24a4b467b..e448c63f0 100644 --- a/dbgpt/storage/graph_store/base.py +++ b/dbgpt/storage/graph_store/base.py @@ -5,7 +5,7 @@ from dbgpt._private.pydantic import BaseModel, ConfigDict, Field from dbgpt.core import Embeddings -from dbgpt.storage.graph_store.graph import Direction, Graph +from dbgpt.storage.graph_store.graph import Direction, Graph, Vertex logger = logging.getLogger(__name__) @@ -44,6 +44,10 @@ def get_vertex_type(self) -> str: def get_edge_type(self) -> str: """Get the edge type.""" + @abstractmethod + def get_document_vertex(self, doc_name:str) -> Vertex: + """Add graph.""" + @abstractmethod def insert_triplet(self, sub: str, rel: str, obj: str): """Add triplet.""" @@ -60,6 +64,10 @@ def get_triplets(self, sub: str) -> List[Tuple[str, str]]: def delete_triplet(self, sub: str, rel: str, obj: str): """Delete triplet.""" + @abstractmethod + def delete_document(self, doc_name: str): + """Delete document.""" + @abstractmethod def truncate(self): """Truncate Graph.""" @@ -87,6 +95,14 @@ def explore( ) -> Graph: """Explore on graph.""" + def explore_text_link( + self, + subs: List[str], + depth: Optional[int] = None, + limit: Optional[int] = None, + ) -> Graph: + """Explore text link on graph.""" + @abstractmethod def query(self, query: str, **args) -> Graph: """Execute a query.""" diff --git a/dbgpt/storage/graph_store/graph.py b/dbgpt/storage/graph_store/graph.py index 555bcc14d..da4e5c232 100644 --- a/dbgpt/storage/graph_store/graph.py +++ b/dbgpt/storage/graph_store/graph.py @@ -188,11 +188,11 @@ def get_neighbor_edges( """Get neighbor edges.""" @abstractmethod - def vertices(self) -> Iterator[Vertex]: + def vertices(self, vertex_prop:Optional[str] = None) -> Iterator[Vertex]: """Get vertex iterator.""" @abstractmethod - def edges(self) -> Iterator[Edge]: + def edges(self, edge_prop:Optional[str] = None) -> Iterator[Edge]: """Get edge iterator.""" @abstractmethod @@ -335,13 +335,21 @@ def unique_elements(elements): return itertools.islice(es, limit) if limit else es - def vertices(self) -> Iterator[Vertex]: + def vertices(self, vertex_type: Optional[str] = None) -> Iterator[Vertex]: """Return vertices.""" - return iter(self._vs.values()) + return ( + item for item in self._vs.values() + if vertex_type is None or item.get_prop('vertex_type') == vertex_type + ) - def edges(self) -> Iterator[Edge]: + def edges(self, edge_type: Optional[str] = None) -> Iterator[Edge]: """Return edges.""" - return iter(e for nbs in self._oes.values() for es in nbs.values() for e in es) + return ( + e for nbs in self._oes.values() + for es in nbs.values() + for e in es + if edge_type is None or e.get_prop('edge_type') == edge_type + ) def del_vertices(self, *vids: str): """Delete specified vertices.""" diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index 3fdd2df8b..32a856014 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -36,11 +36,27 @@ class TuGraphStoreConfig(GraphStoreConfig): ) vertex_type: str = Field( default="entity", - description="The type of vertex, `entity` by default.", + description="The type of entity vertex, `entity` by default.", + ) + document_type: str = Field( + default="document", + description="The type of document vertex, `entity` by default.", + ) + chunk_type: str = Field( + default="chunk", + description="The type of chunk vertex, `relation` by default.", ) edge_type: str = Field( default="relation", - description="The type of edge, `relation` by default.", + description="The type of relation edge, `relation` by default.", + ) + include_type: str = Field( + default="include", + description="The type of include edge, `include` by default.", + ) + next_type:str = Field( + default="next", + description="The type of next edge, `next` by default.", ) plugin_names: List[str] = Field( default=["leiden"], @@ -73,6 +89,10 @@ def __init__(self, config: TuGraphStoreConfig) -> None: self._graph_name = config.name self._vertex_type = os.getenv("TUGRAPH_VERTEX_TYPE", config.vertex_type) self._edge_type = os.getenv("TUGRAPH_EDGE_TYPE", config.edge_type) + self._document_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.document_type) + self._chunk_type = os.getenv("TUGRAPH_CHUNK_TYPE", config.chunk_type) + self._include_type = os.getenv("TUGRAPH_INCLUDE_TYPE", config.include_type) + self._next_type = os.getenv("TUGRAPH_NEXT_TYPE", config.next_type) self.conn = TuGraphConnector.from_uri_db( host=self._host, @@ -91,19 +111,151 @@ def get_vertex_type(self) -> str: def get_edge_type(self) -> str: """Get the edge type.""" return self._edge_type + + def get_document_vertex(slef,doc_name:str) -> Vertex: + gql = f'''MATCH (n) WHERE n.id = {doc_name} RETURN n''' + graph = slef.query(gql) + vertex = graph.get_vertex(doc_name) + return vertex + + def delete_document(self, chunk_ids:str): + chunkids_list = [uuid.strip() for uuid in chunk_ids.split(',')] + del_chunk_gql = f"MATCH(m:{self._document_type})-[r]->(n:{self._chunk_type}) WHERE n.id IN {chunkids_list} DELETE n" + del_relation_gql = f"MATCH(m:{self._vertex_type})-[r:{self._edge_type}]-(n:{self._vertex_type}) WHERE r._chunk_id IN {chunkids_list} DELETE r" + delete_only_vertex = "MATCH (n) WHERE NOT EXISTS((n)-[]-()) DELETE n" + self.conn.run(del_chunk_gql) + self.conn.run(del_relation_gql) + self.conn.run(delete_only_vertex) + def _create_graph(self, graph_name: str): self.conn.create_graph(graph_name=graph_name) - self._create_schema() + document_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._document_type,label_type='VERTEX',data=json.dumps({"label":self._document_type,"type":"VERTEX", "primary":"id","properties":document_proerties})) + chunk_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "content", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._chunk_type,label_type='VERTEX',data=json.dumps({"label":self._chunk_type,"type":"VERTEX", "primary":"id","properties":chunk_proerties})) + vertex_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "description", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._vertex_type,label_type='VERTEX',data=json.dumps({"label":self._vertex_type,"type":"VERTEX", "primary":"id","properties":vertex_proerties})) + edge_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_chunk_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "description", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._edge_type,label_type='EDGE',data=json.dumps({"label":self._edge_type,"type":"EDGE", "constraints":[[self._vertex_type,self._vertex_type]],"properties":edge_proerties})) + include_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "description", + "type": "STRING", + "optional": True + } + ] + self._create_schema(label_name=self._include_type,label_type='EDGE',data=json.dumps({"label":self._include_type,"type":"EDGE", "constraints":[[self._document_type,self._chunk_type],[self._chunk_type,self._chunk_type],[self._chunk_type,self._vertex_type]],"properties":include_proerties})) + next_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "description", + "type": "STRING", + "optional": True + } + ] + self._create_schema(label_name=self._next_type,label_type='EDGE',data=json.dumps({"label":self._next_type,"type":"EDGE", "constraints":[[self._chunk_type,self._chunk_type]],"properties":next_proerties})) if self._summary_enabled: self._upload_plugin() def _check_label(self, elem_type: str): result = self.conn.get_table_names() - if elem_type == "vertex": + if elem_type == "entity": return self._vertex_type in result["vertex_tables"] - if elem_type == "edge": + if elem_type == "chunk": + return self._chunk_type in result["vertex_tables"] + if elem_type == "document": + return self._document_type in result["vertex_tables"] + if elem_type == "relation": return self._edge_type in result["edge_tables"] + if elem_type == "include": + return self._include_type in result["edge_tables"] + if elem_type == "next": + return self._next_type in result["edge_tables"] def _add_vertex_index(self, field_name): gql = f"CALL db.addIndex('{self._vertex_type}', '{field_name}', false)" @@ -141,47 +293,14 @@ def _upload_plugin(self): ) self.conn.run(gql) - def _create_schema(self): - if not self._check_label("vertex"): - if self._summary_enabled: - create_vertex_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._vertex_type}', " - f"'id', ['id',string,false]," - f"['name',string,false]," - f"['_document_id',string,true]," - f"['_chunk_id',string,true]," - f"['_community_id',string,true]," - f"['description',string,true])" - ) - self.conn.run(create_vertex_gql) - self._add_vertex_index("_community_id") + def _create_schema(self,label_name:str, label_type:str, data:Any): + if not self._check_label(label_name): + if label_type == 'VERTEX': + gql = f'''CALL db.createVertexLabelByJson('{data}')''' else: - create_vertex_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._vertex_type}', " - f"'id', ['id',string,false]," - f"['name',string,false])" - ) - self.conn.run(create_vertex_gql) - - if not self._check_label("edge"): - create_edge_gql = f"""CALL db.createLabel( - 'edge', '{self._edge_type}', - '[["{self._vertex_type}", - "{self._vertex_type}"]]', - ["id",STRING,false], - ["name",STRING,false])""" - if self._summary_enabled: - create_edge_gql = f"""CALL db.createLabel( - 'edge', '{self._edge_type}', - '[["{self._vertex_type}", - "{self._vertex_type}"]]', - ["id",STRING,false], - ["name",STRING,false], - ["description",STRING,true])""" - self.conn.run(create_edge_gql) - + gql = f'''CALL db.createEdgeLabelByJson('{data}')''' + self.conn.run(gql) + def _format_query_data(self, data, white_prop_list: List[str]): nodes_list = [] rels_list: List[Any] = [] @@ -199,14 +318,16 @@ def get_filtered_properties(properties, white_list): def process_node(node: graph.Node): node_id = node._properties.get("id") node_name = node._properties.get("name") + node_type = next(iter(node._labels)) node_properties = get_filtered_properties(node._properties, _white_list) nodes_list.append( - {"id": node_id, "name": node_name, "properties": node_properties} + {"id": node_id,"type":node_type, "name": node_name, "properties": node_properties} ) def process_relationship(rel: graph.Relationship): name = rel._properties.get("name", "") rel_nodes = rel.nodes + rel_type = rel.type src_id = rel_nodes[0]._properties.get("id") dst_id = rel_nodes[1]._properties.get("id") for node in rel_nodes: @@ -223,6 +344,7 @@ def process_relationship(rel: graph.Relationship): "src_id": src_id, "dst_id": dst_id, "name": name, + "type":rel_type, "properties": edge_properties, } ) @@ -239,6 +361,7 @@ def process_other(value): { "id": "json_node", "name": "json_node", + "type": "json_node", "properties": {"description": value}, } ) @@ -255,15 +378,32 @@ def process_other(value): else: process_other(value) nodes = [ - Vertex(node["id"], node["name"], **node["properties"]) + Vertex(node["id"], node["name"], **{"type": node["type"], **node["properties"]}) for node in nodes_list ] rels = [ - Edge(edge["src_id"], edge["dst_id"], edge["name"], **edge["properties"]) + Edge(edge["src_id"], edge["dst_id"],edge["name"], **{"type": edge["type"], **edge["properties"]}) for edge in rels_list ] return {"nodes": nodes, "edges": rels} + def _escape_quotes(self, value: str) -> str: + """Escape single and double quotes in a string for queries.""" + if value is not None: + return value.replace("'", "").replace('"', "") + + def _parser(self, entity_list): + formatted_nodes = [ + "{" + + ", ".join( + f'{k}: "{v}"' if isinstance(v, str) else f"{k}: {v}" + for k, v in node.items() + ) + + "}" + for node in entity_list + ] + return f"""{', '.join(formatted_nodes)}""" + def get_config(self): """Get the graph store config.""" return self._config @@ -303,63 +443,83 @@ def escape_quotes(value: str) -> str: self.conn.run(query=node_query) self.conn.run(query=edge_query) - def insert_graph(self, graph: Graph) -> None: - """Add graph.""" + def _upsert_entities(self, entities): + entity_list = [{ + "id": self._escape_quotes(entity.vid), + "name": self._escape_quotes(entity.name), + "description": self._escape_quotes(entity.get_prop("description")) or "", + "_document_id": "0", + "_chunk_id": "0", + "_community_id": "0", + } for entity in entities] + entity_query = ( + f"""CALL db.upsertVertex("{self._vertex_type}", [{self._parser(entity_list)}])""" + ) + self.conn.run(query=entity_query) - def escape_quotes(value: str) -> str: - """Escape single and double quotes in a string for queries.""" - if value is not None: - return value.replace("'", "").replace('"', "") + def _upsert_chunks(self, chunks): + chunk_list = [{ "id": self._escape_quotes(chunk.vid),"name": self._escape_quotes(chunk.name),"content": self._escape_quotes(chunk.get_prop('content'))} for chunk in chunks] + chunk_query = ( + f"""CALL db.upsertVertex("{self._chunk_type}", [{self._parser(chunk_list)}])""" + ) + self.conn.run(query=chunk_query) - nodes: Iterator[Vertex] = graph.vertices() - edges: Iterator[Edge] = graph.edges() - node_list = [] - edge_list = [] + def _upsert_documents(self, documents): + document_list = [{ "id": self._escape_quotes(document.vid),"name": self._escape_quotes(document.name), "content": self._escape_quotes(document.get_prop('content')) or ''} for document in documents] + document_query = ( + f"""CALL db.upsertVertex("{self._document_type}", [{self._parser(document_list)}])""" + ) + self.conn.run(query=document_query) + + def _upsert_edge(self, edges, edge_type, src_type, dst_type): + edge_list = [{ + "sid": self._escape_quotes(edge.sid), + "tid": self._escape_quotes(edge.tid), + "id": self._escape_quotes(edge.name), + "name": self._escape_quotes(edge.name), + "description": self._escape_quotes(edge.get_prop("description")) or "", + "_chunk_id": self._escape_quotes(edge.get_prop("_chunk_id")) or "", + } for edge in edges] + relation_query = ( + f"""CALL db.upsertEdge("{edge_type}", + {{type:"{src_type}", key:"sid"}}, + {{type:"{dst_type}", key:"tid"}}, + [{self._parser(edge_list)}])""" + ) + self.conn.run(query=relation_query) + - def parser(node_list): - formatted_nodes = [ - "{" - + ", ".join( - f'{k}: "{v}"' if isinstance(v, str) else f"{k}: {v}" - for k, v in node.items() - ) - + "}" - for node in node_list - ] - return f"""{', '.join(formatted_nodes)}""" + def _upsert_chunk_include_chunk(self,edges): + pass + + def _upsert_chunk_include_entity(self,edges): + pass + + def _upsert_relation(self,edges): + pass + + def insert_graph(self, graph: Graph) -> None: + # This part of the code needs optimization. + """Add graph.""" + + documents: Iterator[Vertex] = graph.vertices('document') + doc_include_chunk: Iterator[Edge] = graph.edges('document_include_chunk') + chunks: Iterator[Vertex] = graph.vertices('chunk') + chunk_include_chunk: Iterator[Edge] = graph.edges('chunk_include_chunk') + chunk_next_chunk: Iterator[Edge] = graph.edges('chunk_next_chunk') + entities: Iterator[Vertex] = graph.vertices('entity') + chunk_include_entity: Iterator[Edge] = graph.edges('chunk_include_entity') + relation: Iterator[Edge] = graph.edges('relation') + self._upsert_entities(entities) + self._upsert_chunks(chunks) + self._upsert_documents(documents) + self._upsert_edge(doc_include_chunk, self._include_type, self._document_type, self._chunk_type) + self._upsert_edge(chunk_include_chunk, self._include_type, self._chunk_type, self._chunk_type) + self._upsert_edge(chunk_include_entity, self._include_type, self._chunk_type, self._vertex_type) + self._upsert_edge(chunk_next_chunk, self._next_type, self._chunk_type, self._chunk_type) + self._upsert_edge(relation, self._edge_type, self._vertex_type, self._vertex_type) - for node in nodes: - node_list.append( - { - "id": escape_quotes(node.vid), - "name": escape_quotes(node.name), - "description": escape_quotes(node.get_prop("description")) or "", - "_document_id": "0", - "_chunk_id": "0", - "_community_id": "0", - } - ) - node_query = ( - f"""CALL db.upsertVertex("{self._vertex_type}", [{parser(node_list)}])""" - ) - for edge in edges: - edge_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")), - } - ) - edge_query = f"""CALL db.upsertEdge( - "{self._edge_type}", - {{type:"{self._vertex_type}", key:"sid"}}, - {{type:"{self._vertex_type}", key:"tid"}}, - [{parser(edge_list)}])""" - self.conn.run(query=node_query) - self.conn.run(query=edge_query) def truncate(self): """Truncate Graph.""" @@ -435,6 +595,32 @@ def explore( f"WHERE n.id IN {subs} RETURN p {limit_string}" ) return self.query(query) + + def explore_text_link( + self, + subs:List[str], + depth: Optional[int] = None, + limit: Optional[int] = None, + ) -> Graph: + """Explore the graph text link.""" + if not subs: + return MemoryGraph() + depth_string = f"1..{depth}" + if depth is None: + depth_string = ".." + limit_string = f"LIMIT {limit}" + if limit is None: + limit_string = "" + graph = MemoryGraph() + for sub in subs: + query = f"MATCH p=(n:{self._document_type})-[r:{self._include_type}*{depth_string}]-(m:{self._chunk_type})WHERE m.content CONTAINS '{sub}' RETURN p {limit_string}" + result = self.query(query) + for vertex in result.vertices(): + graph.upsert_vertex(vertex) + for edge in result.edges(): + graph.append_edge(edge) + + return graph def query(self, query: str, **args) -> MemoryGraph: """Execute a query on graph.""" diff --git a/dbgpt/storage/knowledge_graph/base.py b/dbgpt/storage/knowledge_graph/base.py index e47094bba..1447c22e6 100644 --- a/dbgpt/storage/knowledge_graph/base.py +++ b/dbgpt/storage/knowledge_graph/base.py @@ -29,4 +29,6 @@ def query_graph(self, limit: Optional[int] = None) -> Graph: def delete_by_ids(self, ids: str) -> List[str]: """Delete document by ids.""" - raise Exception("Delete document not supported by knowledge graph") + # raise Exception("Delete document not supported by knowledge graph") + + diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index a5bf272ac..258144abf 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -2,14 +2,16 @@ import logging import os +import uuid from typing import List, Optional - +import asyncio from dbgpt._private.pydantic import ConfigDict, Field from dbgpt.core import Chunk from dbgpt.rag.transformer.community_summarizer import CommunitySummarizer from dbgpt.rag.transformer.graph_extractor import GraphExtractor from dbgpt.storage.knowledge_graph.community.community_store import CommunityStore from dbgpt.storage.knowledge_graph.community.factory import CommunityStoreAdapterFactory +from dbgpt.storage.graph_store.graph import Edge, Graph, MemoryGraph, Vertex from dbgpt.storage.knowledge_graph.knowledge_graph import ( BuiltinKnowledgeGraph, BuiltinKnowledgeGraphConfig, @@ -132,20 +134,95 @@ def community_store_configure(name: str, cfg: VectorStoreConfig): def get_config(self) -> BuiltinKnowledgeGraphConfig: """Get the knowledge graph config.""" return self._config + + def _parse_chunks(slef, chunks: List[Chunk]): + data = [] + for chunk_index, chunk in enumerate(chunks): + parent = None + directory_keys = list(chunk.metadata.keys())[:-1] + parent_level = directory_keys[-2] if len(directory_keys) > 1 else None + self_level = directory_keys[-1] if directory_keys else 'Header0' + + obj = { + 'id': chunk.chunk_id, + 'title': chunk.metadata.get(self_level, 'none_header_chunk'), + 'directory_keys': directory_keys, + 'level': self_level, + 'content': chunk.content, + 'parent_id': None, + 'parent_title':None, + 'type':'chunk', + 'chunk_index':chunk_index + } + + if parent_level: + for parent_direct in directory_keys[:-1][::-1]: + parent_name = chunk.metadata.get(parent_direct, None) + for n in range(chunk_index-1, -1, -1): + metadata = chunks[n].metadata + keys = list(metadata.keys())[:-1] + if metadata and parent_direct == keys[-1] and parent_name == metadata.get(parent_direct): + parent = chunks[n] + obj['parent_id'] = parent.chunk_id + obj['parent_title'] = parent_name + break + if chunk_index - n > len(directory_keys): + break + if obj['parent_id']: + break + + if not obj['parent_id']: + obj['parent_id'] = 'document' + data.append(obj) + return data async def aload_document(self, chunks: List[Chunk]) -> List[str]: """Extract and persist graph.""" - # todo add doc node - for chunk in chunks: - # todo add chunk node - # todo add relation doc-chunk + + # check document + file_path = chunks[0].metadata['source'] or 'Text_Node' + doc_name = os.path.basename(file_path) + hash_id = str(uuid.uuid4()) + + data_list = self._parse_chunks(chunks) + total_graph = MemoryGraph() + + for index,data in enumerate(data_list): + chunk_src = Vertex(f"""{data['parent_id']}""",name=data["parent_title"],vertex_type=data["type"],content=data["content"]) + chunk_dst = Vertex(f"""{data["id"]}""",name=data["title"],vertex_type=data["type"],content=data["content"]) + chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="chunk_include_chunk") + chunk_next_chunk = None + if index >= 1: + chunk_next_chunk = Edge(data_list[index - 1]["id"],data_list[index]["id"],name="next",edge_type="chunk_next_chunk") + if data['parent_id'] == 'document': + chunk_src = Vertex(f"""{hash_id}""",name=doc_name,vertex_type='document',content=data["content"]) + chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="document_include_chunk") + total_graph.upsert_vertex(chunk_src) + total_graph.upsert_vertex(chunk_dst) + total_graph.append_edge(chunk_include_chunk) + if chunk_next_chunk: + total_graph.append_edge(chunk_next_chunk) + if os.getenv('ONLY_EXTRACT_DOCUMENT_STRUCTURE').lower() != 'true': + graphs = await self._graph_extractor.extract(data["content"]) + for graph in graphs: + for vertex in graph.vertices(): + total_graph.upsert_vertex(vertex) + chunk_include_entity = Edge(chunk_dst.vid,vertex.vid,name=f"include",edge_type="chunk_include_entity") + total_graph.append_edge(chunk_include_entity) + for edge in graph.edges(): + edge.set_prop('_chunk_id',chunk_dst.vid) + total_graph.append_edge(edge) + + self._graph_store.insert_graph(total_graph) - # extract graphs and save - graphs = await self._graph_extractor.extract(chunk.content) - for graph in graphs: - self._graph_store.insert_graph(graph) + # use asyncio.gather + # tasks = [self._graph_extractor.extract(chunk.content) for chunk in chunks] + # results = await asyncio.gather(*tasks) + # for result in results: + # self._graph_store.insert_graph(result[0]) # build communities and save + await self._community_store.build_communities() return [chunk.chunk_id for chunk in chunks] @@ -169,13 +246,14 @@ async def asimilar_search_with_scores( # local search: extract keywords and explore subgraph keywords = await self._keyword_extractor.extract(text) subgraph = self._graph_store.explore(keywords, limit=topk).format() + document_subgraph = self._graph_store.explore_text_link(keywords, limit=topk).format() logger.info(f"Search subgraph from {len(keywords)} keywords") if not summaries and not subgraph: return [] # merge search results into context - content = HYBRID_SEARCH_PT_CN.format(context=context, graph=subgraph) + content = HYBRID_SEARCH_PT_CN.format(context=context, graph=subgraph, document_subgraph = document_subgraph) return [Chunk(content=content)] def truncate(self) -> List[str]: @@ -199,6 +277,9 @@ def delete_vector_name(self, index_name: str): logger.info("Drop triplet extractor") self._graph_extractor.drop() + + + HYBRID_SEARCH_PT_CN = ( "## 角色\n" diff --git a/dbgpt/storage/knowledge_graph/knowledge_graph.py b/dbgpt/storage/knowledge_graph/knowledge_graph.py index 066e2667d..d22ebbb07 100644 --- a/dbgpt/storage/knowledge_graph/knowledge_graph.py +++ b/dbgpt/storage/knowledge_graph/knowledge_graph.py @@ -117,6 +117,7 @@ async def asimilar_search_with_scores( # extract keywords and explore graph store keywords = await self._keyword_extractor.extract(text) subgraph = self._graph_store.explore(keywords, limit=topk).format() + logger.info(f"Search subgraph from {len(keywords)} keywords") if not subgraph: @@ -172,3 +173,7 @@ def delete_vector_name(self, index_name: str): logger.info("Drop triplet extractor") self._triplet_extractor.drop() + + def delete_by_ids(self, ids: str) -> List[str]: + self._graph_store.delete_document(chunk_ids = ids) + return []