diff --git a/dbgpt/cli/cli_scripts.py b/dbgpt/cli/cli_scripts.py index 5b170f9c2..d1c9fa368 100644 --- a/dbgpt/cli/cli_scripts.py +++ b/dbgpt/cli/cli_scripts.py @@ -201,7 +201,7 @@ def stop_all(): logging.warning(f"Integrating dbgpt dbgpts command line tool failed: {e}") try: - from dbgpt.client._cli import run_flow + from dbgpt.client._cli import flow as run_flow add_command_alias(run_flow, name="flow", parent_group=run) except ImportError as e: diff --git a/dbgpt/client/_cli.py b/dbgpt/client/_cli.py index fa73aa69e..3dd29bbbe 100644 --- a/dbgpt/client/_cli.py +++ b/dbgpt/client/_cli.py @@ -1,22 +1,55 @@ """CLI for DB-GPT client.""" +import asyncio import functools import json import time import uuid -from typing import Any, Dict +from typing import Any, AsyncIterator, Callable, Dict, Tuple, cast import click +from dbgpt.component import SystemApp +from dbgpt.core.awel import DAG, BaseOperator, DAGVar +from dbgpt.core.awel.dag.dag_manager import DAGMetadata, _parse_metadata +from dbgpt.core.awel.flow.flow_factory import FlowFactory from dbgpt.util import get_or_create_event_loop from dbgpt.util.console import CliLogger from dbgpt.util.i18n_utils import _ from .client import Client from .flow import list_flow +from .flow import run_flow_cmd as client_run_flow_cmd cl = CliLogger() +_LOCAL_MODE: bool | None = False +_FILE_PATH: str | None = None + + +@click.group() +@click.option( + "--local", + required=False, + type=bool, + default=False, + is_flag=True, + help="Whether use local mode(run local AWEL file)", +) +@click.option( + "-f", + "--file", + type=str, + default=None, + required=False, + help=_("The path of the AWEL flow"), +) +def flow(local: bool = False, file: str | None = None): + """Run a AWEL flow.""" + global _LOCAL_MODE, _FILE_PATH + _LOCAL_MODE = local + _FILE_PATH = file + def add_base_flow_options(func): """Add base flow options to the command.""" @@ -124,32 +157,229 @@ def _wrapper(*args, **kwargs): return _wrapper -@click.command(name="flow") +@flow.command(name="chat") @add_base_flow_options @add_chat_options -def run_flow(name: str, uid: str, data: str, interactive: bool, **kwargs): +def run_flow_chat(name: str, uid: str, data: str, interactive: bool, **kwargs): """Run a AWEL flow.""" + json_data = _parse_chat_json_data(data, **kwargs) + stream = "stream" in json_data and str(json_data["stream"]).lower() in ["true", "1"] + loop = get_or_create_event_loop() + if _LOCAL_MODE: + _run_flow_chat_local(loop, name, interactive, json_data, stream) + return + client = Client() - loop = get_or_create_event_loop() - res = loop.run_until_complete(list_flow(client, name, uid)) + # AWEL flow store the python module name now, so we need to replace "-" with "_" + new_name = name.replace("-", "_") + res = loop.run_until_complete(list_flow(client, new_name, uid)) if not res: cl.error("Flow not found with the given name or uid", exit_code=1) if len(res) > 1: cl.error("More than one flow found", exit_code=1) flow = res[0] - json_data = _parse_json_data(data, **kwargs) json_data["chat_param"] = flow.uid json_data["chat_mode"] = "chat_flow" - stream = "stream" in json_data and str(json_data["stream"]).lower() in ["true", "1"] if stream: - loop.run_until_complete(_chat_stream(client, interactive, json_data)) + _run_flow_chat_stream(loop, client, interactive, json_data) else: - loop.run_until_complete(_chat(client, interactive, json_data)) + _run_flow_chat(loop, client, interactive, json_data) + + +@flow.command(name="cmd") +@add_base_flow_options +@click.option( + "-d", + "--data", + type=str, + default=None, + required=False, + help=_("The json data to run AWEL flow, if set, will overwrite other options"), +) +@click.option( + "--output_key", + type=str, + default=None, + required=False, + help=_( + "The output key of the AWEL flow, if set, it will try to get the output by the " + "key" + ), +) +def run_flow_cmd( + name: str, uid: str, data: str | None = None, output_key: str | None = None +): + """Run a AWEL flow with command mode.""" + json_data = _parse_json_data(data) + loop = get_or_create_event_loop() + + if _LOCAL_MODE: + _run_flow_cmd_local(loop, name, json_data, output_key) + else: + _run_flow_cmd(loop, name, uid, json_data, output_key) + + +def _run_flow_cmd_local( + loop: asyncio.BaseEventLoop, + name: str, + data: Dict[str, Any] | None = None, + output_key: str | None = None, +): + from dbgpt.core.awel.util.chat_util import safe_chat_stream_with_dag_task + + end_node, dag, dag_metadata, call_body = _parse_and_check_local_dag( + name, _FILE_PATH, data + ) + + async def _streaming_call(): + start_time = time.time() + try: + cl.debug("[~info] Flow started") + cl.debug(f"[~info] JSON data: {json.dumps(data, ensure_ascii=False)}") + cl.debug("Command output: ") + async for out in safe_chat_stream_with_dag_task( + end_node, call_body, incremental=True, covert_to_str=True + ): + if not out.success: + cl.error(out.text) + else: + cl.print(out.text, end="") + except Exception as e: + cl.error(f"Failed to run flow: {e}", exit_code=1) + finally: + time_cost = round(time.time() - start_time, 2) + cl.success(f"\n:tada: Flow finished, timecost: {time_cost} s") + + loop.run_until_complete(_streaming_call()) + + +def _run_flow_cmd( + loop: asyncio.BaseEventLoop, + name: str | None = None, + uid: str | None = None, + json_data: Dict[str, Any] | None = None, + output_key: str | None = None, +): + client = Client() + + def _non_streaming_callback(text: str): + parsed_text: Any = None + if output_key: + try: + json_out = json.loads(text) + parsed_text = json_out.get(output_key) + except Exception as e: + cl.warning(f"Failed to parse output by key: {output_key}, {e}") + if not parsed_text: + parsed_text = text + cl.markdown(parsed_text) + + def _streaming_callback(text: str): + cl.print(text, end="") + + async def _client_run_cmd(): + cl.debug("[~info] Flow started") + cl.debug(f"[~info] JSON data: {json.dumps(json_data, ensure_ascii=False)}") + cl.debug("Command output: ") + start_time = time.time() + # AWEL flow store the python module name now, so we need to replace "-" with "_" + new_name = name.replace("-", "_") + try: + await client_run_flow_cmd( + client, + new_name, + uid, + json_data, + non_streaming_callback=_non_streaming_callback, + streaming_callback=_streaming_callback, + ) + except Exception as e: + cl.error(f"Failed to run flow: {e}", exit_code=1) + finally: + time_cost = round(time.time() - start_time, 2) + cl.success(f"\n:tada: Flow finished, timecost: {time_cost} s") + + loop.run_until_complete(_client_run_cmd()) -def _parse_json_data(data: str, **kwargs): +def _parse_and_check_local_dag( + name: str, + filepath: str | None = None, + data: Dict[str, Any] | None = None, +) -> Tuple[BaseOperator, DAG, DAGMetadata, Any]: + + dag, dag_metadata = _parse_local_dag(name, filepath) + + return _check_local_dag(dag, dag_metadata, data) + + +def _check_local_dag( + dag: DAG, dag_metadata: DAGMetadata, data: Dict[str, Any] | None = None +) -> Tuple[BaseOperator, DAG, DAGMetadata, Any]: + from dbgpt.core.awel import HttpTrigger + + leaf_nodes = dag.leaf_nodes + if not leaf_nodes: + cl.error("No leaf nodes found in the flow", exit_code=1) + if len(leaf_nodes) > 1: + cl.error("More than one leaf nodes found in the flow", exit_code=1) + if not isinstance(leaf_nodes[0], BaseOperator): + cl.error("Unsupported leaf node type", exit_code=1) + end_node = cast(BaseOperator, leaf_nodes[0]) + call_body: Any = data + trigger_nodes = dag.trigger_nodes + if trigger_nodes: + if len(trigger_nodes) > 1: + cl.error("More than one trigger nodes found in the flow", exit_code=1) + trigger = trigger_nodes[0] + if isinstance(trigger, HttpTrigger): + http_trigger = trigger + if http_trigger._req_body and data: + call_body = http_trigger._req_body(**data) + else: + cl.error("Unsupported trigger type", exit_code=1) + return end_node, dag, dag_metadata, call_body + + +def _parse_local_dag(name: str, filepath: str | None = None) -> Tuple[DAG, DAGMetadata]: + + system_app = SystemApp() + DAGVar.set_current_system_app(system_app) + + if not filepath: + # Load DAG from installed package(dbgpts) + from dbgpt.util.dbgpts.loader import ( + _flow_package_to_flow_panel, + _load_flow_package_from_path, + ) + + flow_panel = _flow_package_to_flow_panel(_load_flow_package_from_path(name)) + if flow_panel.define_type == "json": + factory = FlowFactory() + factory.pre_load_requirements(flow_panel) + dag = factory.build(flow_panel) + else: + dag = flow_panel.flow_dag + return dag, _parse_metadata(dag) + else: + from dbgpt.core.awel.dag.loader import _process_file + + dags = _process_file(filepath) + if not dags: + cl.error("No DAG found in the file", exit_code=1) + if len(dags) > 1: + dags = [dag for dag in dags if dag.dag_id == name] + # Filter by name + if len(dags) > 1: + cl.error("More than one DAG found in the file", exit_code=1) + if not dags: + cl.error("No DAG found with the given name", exit_code=1) + return dags[0], _parse_metadata(dags[0]) + + +def _parse_chat_json_data(data: str, **kwargs): json_data = {} if data: try: @@ -170,7 +400,100 @@ def _parse_json_data(data: str, **kwargs): return json_data -async def _chat_stream(client: Client, interactive: bool, json_data: Dict[str, Any]): +def _parse_json_data(data: str | None) -> Dict[str, Any] | None: + if not data: + return None + try: + return json.loads(data) + except Exception as e: + cl.error(f"Invalid JSON data: {data}, {e}", exit_code=1) + # Should not reach here + return None + + +def _run_flow_chat_local( + loop: asyncio.BaseEventLoop, + name: str, + interactive: bool, + json_data: Dict[str, Any], + stream: bool, +): + from dbgpt.core.awel.util.chat_util import ( + parse_single_output, + safe_chat_stream_with_dag_task, + ) + + dag, dag_metadata = _parse_local_dag(name, _FILE_PATH) + + async def _streaming_call(_call_body: Dict[str, Any]): + nonlocal dag, dag_metadata + + end_node, dag, dag_metadata, handled_call_body = _check_local_dag( + dag, dag_metadata, _call_body + ) + async for out in safe_chat_stream_with_dag_task( + end_node, handled_call_body, incremental=True, covert_to_str=True + ): + if not out.success: + cl.error(f"Error: {out.text}") + raise Exception(out.text) + else: + yield out.text + + async def _call(_call_body: Dict[str, Any]): + nonlocal dag, dag_metadata + + end_node, dag, dag_metadata, handled_call_body = _check_local_dag( + dag, dag_metadata, _call_body + ) + res = await end_node.call(handled_call_body) + parsed_res = parse_single_output(res, is_sse=False, covert_to_str=True) + if not parsed_res.success: + raise Exception(parsed_res.text) + return parsed_res.text + + if stream: + loop.run_until_complete(_chat_stream(_streaming_call, interactive, json_data)) + else: + loop.run_until_complete(_chat(_call, interactive, json_data)) + + +def _run_flow_chat_stream( + loop: asyncio.BaseEventLoop, + client: Client, + interactive: bool, + json_data: Dict[str, Any], +): + async def _streaming_call(_call_body: Dict[str, Any]): + async for out in client.chat_stream(**_call_body): + if out.choices: + text = out.choices[0].delta.content + if text: + yield text + + loop.run_until_complete(_chat_stream(_streaming_call, interactive, json_data)) + + +def _run_flow_chat( + loop: asyncio.BaseEventLoop, + client: Client, + interactive: bool, + json_data: Dict[str, Any], +): + async def _call(_call_body: Dict[str, Any]): + res = await client.chat(**_call_body) + if res.choices: + text = res.choices[0].message.content + return text + + loop.run_until_complete(_chat(_call, interactive, json_data)) + + +async def _chat_stream( + streaming_func: Callable[[Dict[str, Any]], AsyncIterator[str]], + interactive: bool, + json_data: Dict[str, Any], +): user_input = json_data.get("messages", "") if "conv_uid" not in json_data and interactive: json_data["conv_uid"] = str(uuid.uuid4()) @@ -187,16 +510,14 @@ async def _chat_stream(client: Client, interactive: bool, json_data: Dict[str, A json_data["messages"] = user_input if first_message: cl.info("You: " + user_input) - cl.info("Chat stream started") - cl.debug(f"JSON data: {json.dumps(json_data, ensure_ascii=False)}") + cl.debug("[~info] Chat stream started") + cl.debug(f"[~info] JSON data: {json.dumps(json_data, ensure_ascii=False)}") full_text = "" cl.print("Bot: ") - async for out in client.chat_stream(**json_data): - if out.choices: - text = out.choices[0].delta.content - if text: - full_text += text - cl.print(text, end="") + async for text in streaming_func(json_data): + if text: + full_text += text + cl.print(text, end="") end_time = time.time() time_cost = round(end_time - start_time, 2) cl.success(f"\n:tada: Chat stream finished, timecost: {time_cost} s") @@ -210,7 +531,11 @@ async def _chat_stream(client: Client, interactive: bool, json_data: Dict[str, A break -async def _chat(client: Client, interactive: bool, json_data: Dict[str, Any]): +async def _chat( + func: Callable[[Dict[str, Any]], Any], + interactive: bool, + json_data: Dict[str, Any], +): user_input = json_data.get("messages", "") if "conv_uid" not in json_data and interactive: json_data["conv_uid"] = str(uuid.uuid4()) @@ -228,17 +553,19 @@ async def _chat(client: Client, interactive: bool, json_data: Dict[str, Any]): if first_message: cl.info("You: " + user_input) - cl.info("Chat started") - cl.debug(f"JSON data: {json.dumps(json_data, ensure_ascii=False)}") - res = await client.chat(**json_data) + cl.debug("[~info] Chat started") + cl.debug(f"[~info] JSON data: {json.dumps(json_data, ensure_ascii=False)}") + res = await func(json_data) cl.print("Bot: ") - if res.choices: - text = res.choices[0].message.content - cl.markdown(text) + if res: + cl.markdown(res) time_cost = round(time.time() - start_time, 2) cl.success(f"\n:tada: Chat stream finished, timecost: {time_cost} s") except Exception as e: - cl.error(f"Chat failed: {e}", exit_code=1) + import traceback + + messages = traceback.format_exc() + cl.error(f"Chat failed: {e}\n, error detail: {messages}", exit_code=1) finally: first_message = False if interactive: diff --git a/dbgpt/client/client.py b/dbgpt/client/client.py index da5f235d8..ede0561bf 100644 --- a/dbgpt/client/client.py +++ b/dbgpt/client/client.py @@ -1,4 +1,5 @@ """This module contains the client for the DB-GPT API.""" + import atexit import json import os @@ -102,6 +103,15 @@ def __init__( ) atexit.register(self.close) + def _base_url(self): + parsed_url = urlparse(self._api_url) + host = parsed_url.hostname + scheme = parsed_url.scheme + port = parsed_url.port + if port: + return f"{scheme}://{host}:{port}" + return f"{scheme}://{host}" + async def chat( self, model: str, diff --git a/dbgpt/client/flow.py b/dbgpt/client/flow.py index ec1b7edb4..a61c37f06 100644 --- a/dbgpt/client/flow.py +++ b/dbgpt/client/flow.py @@ -1,5 +1,8 @@ """this module contains the flow client functions.""" -from typing import List + +from typing import Any, Callable, Dict, List + +from httpx import AsyncClient from dbgpt.core.awel.flow.flow_factory import FlowPanel from dbgpt.core.schema.api import Result @@ -117,3 +120,181 @@ async def list_flow( raise ClientException(status=result["err_code"], reason=result) except Exception as e: raise ClientException(f"Failed to list flows: {e}") + + +async def run_flow_cmd( + client: Client, + name: str | None = None, + uid: str | None = None, + data: Dict[str, Any] | None = None, + non_streaming_callback: Callable[[str], None] | None = None, + streaming_callback: Callable[[str], None] | None = None, +) -> None: + """ + Run flows. + + Args: + client (Client): The dbgpt client. + name (str): The name of the flow. + uid (str): The uid of the flow. + data (Dict[str, Any]): The data to run the flow. + non_streaming_callback (Callable[[str], None]): The non-streaming callback. + streaming_callback (Callable[[str], None]): The streaming callback. + Returns: + List[FlowPanel]: The list of flow panels. + Raises: + ClientException: If the request failed. + """ + try: + res = await client.get("/awel/flows", **{"name": name, "uid": uid}) + result: Result = res.json() + if not result["success"]: + raise ClientException("Flow not found with the given name or uid") + flows = result["data"]["items"] + if not flows: + raise ClientException("Flow not found with the given name or uid") + if len(flows) > 1: + raise ClientException("More than one flow found") + flow = flows[0] + flow_panel = FlowPanel(**flow) + metadata = flow.get("metadata") + await _run_flow_trigger( + client, + flow_panel, + metadata, + data, + non_streaming_callback=non_streaming_callback, + streaming_callback=streaming_callback, + ) + except Exception as e: + raise ClientException(f"Failed to run flows: {e}") + + +async def _run_flow_trigger( + client: Client, + flow: FlowPanel, + metadata: Dict[str, Any] | None = None, + data: Dict[str, Any] | None = None, + non_streaming_callback: Callable[[str], None] | None = None, + streaming_callback: Callable[[str], None] | None = None, +): + if not metadata: + raise ClientException("No AWEL flow metadata found") + if "triggers" not in metadata: + raise ClientException("No triggers found in AWEL flow metadata") + triggers = metadata["triggers"] + if len(triggers) > 1: + raise ClientException("More than one trigger found") + trigger = triggers[0] + sse_output = metadata.get("sse_output", False) + streaming_output = metadata.get("streaming_output", False) + trigger_type = trigger["trigger_type"] + if trigger_type == "http": + methods = trigger["methods"] + if not methods: + method = "GET" + else: + method = methods[0] + path = trigger["path"] + base_url = client._base_url() + req_url = f"{base_url}{path}" + if streaming_output: + await _call_stream_request( + client._http_client, + method, + req_url, + sse_output, + data, + streaming_callback, + ) + elif non_streaming_callback: + await _call_non_stream_request( + client._http_client, method, req_url, data, non_streaming_callback + ) + else: + raise ClientException(f"Invalid trigger type: {trigger_type}") + + +async def _call_non_stream_request( + http_client: AsyncClient, + method: str, + base_url: str, + data: Dict[str, Any] | None = None, + non_streaming_callback: Callable[[str], None] | None = None, +): + import httpx + + kwargs: Dict[str, Any] = {"url": base_url, "method": method} + if method in ["POST", "PUT"]: + kwargs["json"] = data + else: + kwargs["params"] = data + response = await http_client.request(**kwargs) + bytes_response_content = await response.aread() + if response.status_code != 200: + str_error_message = "" + error_message = await response.aread() + if error_message: + str_error_message = error_message.decode("utf-8") + raise httpx.RequestError( + f"Request failed with status {response.status_code}, error_message: " + f"{str_error_message}", + request=response.request, + ) + response_content = bytes_response_content.decode("utf-8") + if non_streaming_callback: + non_streaming_callback(response_content) + return response_content + + +async def _call_stream_request( + http_client: AsyncClient, + method: str, + base_url: str, + sse_output: bool, + data: Dict[str, Any] | None = None, + streaming_callback: Callable[[str], None] | None = None, +): + full_out = "" + async for out in _stream_request(http_client, method, base_url, sse_output, data): + if streaming_callback: + streaming_callback(out) + full_out += out + return full_out + + +async def _stream_request( + http_client: AsyncClient, + method: str, + base_url: str, + sse_output: bool, + data: Dict[str, Any] | None = None, +): + import json + + from dbgpt.core.awel.util.chat_util import parse_openai_output + + kwargs: Dict[str, Any] = {"url": base_url, "method": method} + if method in ["POST", "PUT"]: + kwargs["json"] = data + else: + kwargs["params"] = data + + async with http_client.stream(**kwargs) as response: + if response.status_code == 200: + async for line in response.aiter_lines(): + if not line: + continue + if sse_output: + out = parse_openai_output(line) + if not out.success: + raise ClientException(f"Failed to parse output: {out.text}") + yield out.text + else: + yield line + else: + try: + error = await response.aread() + yield json.loads(error) + except Exception as e: + raise e diff --git a/dbgpt/core/awel/dag/base.py b/dbgpt/core/awel/dag/base.py index f4c96dc2d..461321a6f 100644 --- a/dbgpt/core/awel/dag/base.py +++ b/dbgpt/core/awel/dag/base.py @@ -2,6 +2,7 @@ DAG is the core component of AWEL, it is used to define the relationship between tasks. """ + import asyncio import contextvars import logging @@ -613,10 +614,14 @@ class DAG: """ def __init__( - self, dag_id: str, resource_group: Optional[ResourceGroup] = None + self, + dag_id: str, + resource_group: Optional[ResourceGroup] = None, + tags: Optional[Dict[str, str]] = None, ) -> None: """Initialize a DAG.""" self._dag_id = dag_id + self._tags: Dict[str, str] = tags or {} self.node_map: Dict[str, DAGNode] = {} self.node_name_to_node: Dict[str, DAGNode] = {} self._root_nodes: List[DAGNode] = [] @@ -651,6 +656,22 @@ def dag_id(self) -> str: """Return the dag id of current DAG.""" return self._dag_id + @property + def tags(self) -> Dict[str, str]: + """Return the tags of current DAG.""" + return self._tags + + @property + def dev_mode(self) -> bool: + """Whether the current DAG is in dev mode. + + Returns: + bool: Whether the current DAG is in dev mode + """ + from ..operators.base import _dev_mode + + return _dev_mode() + def _build(self) -> None: from ..operators.common_operator import TriggerOperator diff --git a/dbgpt/core/awel/dag/dag_manager.py b/dbgpt/core/awel/dag/dag_manager.py index 611988145..683dbdf39 100644 --- a/dbgpt/core/awel/dag/dag_manager.py +++ b/dbgpt/core/awel/dag/dag_manager.py @@ -3,18 +3,49 @@ DAGManager will load DAGs from dag_dirs, and register the trigger nodes to TriggerManager. """ + import logging import threading -from typing import Dict, List, Optional +from collections import defaultdict +from typing import Dict, List, Optional, Set +from dbgpt._private.pydantic import BaseModel, Field, model_to_dict from dbgpt.component import BaseComponent, ComponentType, SystemApp +from .. import BaseOperator +from ..trigger.base import TriggerMetadata from .base import DAG from .loader import LocalFileDAGLoader logger = logging.getLogger(__name__) +class DAGMetadata(BaseModel): + """Metadata for the DAG.""" + + triggers: List[TriggerMetadata] = Field( + default_factory=list, description="The trigger metadata" + ) + sse_output: bool = Field( + default=False, description="Whether the DAG is a server-sent event output" + ) + streaming_output: bool = Field( + default=False, description="Whether the DAG is a streaming output" + ) + tags: Optional[Dict[str, str]] = Field( + default=None, description="The tags of the DAG" + ) + + def to_dict(self): + """Convert the metadata to dict.""" + triggers_dict = [] + for trigger in self.triggers: + triggers_dict.append(trigger.dict()) + dict_value = model_to_dict(self, exclude={"triggers"}) + dict_value["triggers"] = triggers_dict + return dict_value + + class DAGManager(BaseComponent): """The component of DAGManager.""" @@ -35,6 +66,8 @@ def __init__(self, system_app: SystemApp, dag_dirs: List[str]): self.system_app = system_app self.dag_map: Dict[str, DAG] = {} self.dag_alias_map: Dict[str, str] = {} + self._dag_metadata_map: Dict[str, DAGMetadata] = {} + self._tags_to_dag_ids: Dict[str, Dict[str, Set[str]]] = {} self._trigger_manager: Optional["DefaultTriggerManager"] = None def init_app(self, system_app: SystemApp): @@ -73,12 +106,26 @@ def register_dag(self, dag: DAG, alias_name: Optional[str] = None): if alias_name: self.dag_alias_map[alias_name] = dag_id + trigger_metadata: List["TriggerMetadata"] = [] + dag_metadata = _parse_metadata(dag) if self._trigger_manager: for trigger in dag.trigger_nodes: - self._trigger_manager.register_trigger(trigger, self.system_app) + tm = self._trigger_manager.register_trigger( + trigger, self.system_app + ) + if tm: + trigger_metadata.append(tm) self._trigger_manager.after_register() else: logger.warning("No trigger manager, not register dag trigger") + dag_metadata.triggers = trigger_metadata + self._dag_metadata_map[dag_id] = dag_metadata + tags = dag_metadata.tags + if tags: + for tag_key, tag_value in tags.items(): + if tag_key not in self._tags_to_dag_ids: + self._tags_to_dag_ids[tag_key] = defaultdict(set) + self._tags_to_dag_ids[tag_key][tag_value].add(dag_id) def unregister_dag(self, dag_id: str): """Unregister a DAG.""" @@ -104,7 +151,13 @@ def unregister_dag(self, dag_id: str): for trigger in dag.trigger_nodes: self._trigger_manager.unregister_trigger(trigger, self.system_app) # Finally remove the DAG from the map + metadata = self._dag_metadata_map[dag_id] del self.dag_map[dag_id] + del self._dag_metadata_map[dag_id] + if metadata.tags: + for tag_key, tag_value in metadata.tags.items(): + if tag_key in self._tags_to_dag_ids: + self._tags_to_dag_ids[tag_key][tag_value].remove(dag_id) def get_dag( self, dag_id: Optional[str] = None, alias_name: Optional[str] = None @@ -116,3 +169,33 @@ def get_dag( if alias_name in self.dag_alias_map: return self.dag_map.get(self.dag_alias_map[alias_name]) return None + + def get_dags_by_tag(self, tag_key: str, tag_value) -> List[DAG]: + """Get all DAGs with the given tag.""" + with self.lock: + dag_ids = self._tags_to_dag_ids.get(tag_key, {}).get(tag_value, set()) + return [self.dag_map[dag_id] for dag_id in dag_ids] + + def get_dag_metadata( + self, dag_id: Optional[str] = None, alias_name: Optional[str] = None + ) -> Optional[DAGMetadata]: + """Get a DAGMetadata by dag_id or alias_name.""" + dag = self.get_dag(dag_id, alias_name) + if not dag: + return None + return self._dag_metadata_map.get(dag.dag_id) + + +def _parse_metadata(dag: DAG): + from ..util.chat_util import _is_sse_output + + metadata = DAGMetadata() + metadata.tags = dag.tags + if not dag.leaf_nodes: + return metadata + end_node = dag.leaf_nodes[0] + if not isinstance(end_node, BaseOperator): + return metadata + metadata.sse_output = _is_sse_output(end_node) + metadata.streaming_output = end_node.streaming_operator + return metadata diff --git a/dbgpt/core/awel/flow/flow_factory.py b/dbgpt/core/awel/flow/flow_factory.py index b23633378..3f847c07c 100644 --- a/dbgpt/core/awel/flow/flow_factory.py +++ b/dbgpt/core/awel/flow/flow_factory.py @@ -18,6 +18,7 @@ model_validator, ) from dbgpt.core.awel.dag.base import DAG, DAGNode +from dbgpt.core.awel.dag.dag_manager import DAGMetadata from .base import ( OperatorType, @@ -352,6 +353,9 @@ class FlowPanel(BaseModel): description="The flow panel modified time.", examples=["2021-08-01 12:00:00", "2021-08-01 12:00:01", "2021-08-01 12:00:02"], ) + metadata: Optional[Union[DAGMetadata, Dict[str, Any]]] = Field( + default=None, description="The metadata of the flow" + ) @model_validator(mode="before") @classmethod diff --git a/dbgpt/core/awel/operators/base.py b/dbgpt/core/awel/operators/base.py index 6d486a1f6..58f3acabc 100644 --- a/dbgpt/core/awel/operators/base.py +++ b/dbgpt/core/awel/operators/base.py @@ -69,6 +69,15 @@ async def execute_workflow( default_runner: Optional[WorkflowRunner] = None +def _dev_mode() -> bool: + """Check if the operator is in dev mode. + + In production mode, the default runner is not None, and the operator will run in + the same process with the DB-GPT webserver. + """ + return default_runner is None + + class BaseOperatorMeta(ABCMeta): """Metaclass of BaseOperator.""" @@ -86,7 +95,9 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any: if not executor: if system_app: executor = system_app.get_component( - ComponentType.EXECUTOR_DEFAULT, DefaultExecutorFactory + ComponentType.EXECUTOR_DEFAULT, + DefaultExecutorFactory, + default_component=DefaultExecutorFactory(), ).create() # type: ignore else: executor = DefaultExecutorFactory().create() @@ -173,13 +184,14 @@ def current_dag_context(self) -> DAGContext: def dev_mode(self) -> bool: """Whether the operator is in dev mode. - In production mode, the default runner is not None. + In production mode, the default runner is not None, and the operator will run in + the same process with the DB-GPT webserver. Returns: bool: Whether the operator is in dev mode. True if the default runner is None. """ - return default_runner is None + return _dev_mode() async def _run(self, dag_ctx: DAGContext, task_log_id: str) -> TaskOutput[OUT]: if not self.node_id: diff --git a/dbgpt/core/awel/trigger/base.py b/dbgpt/core/awel/trigger/base.py index 9b59cd94e..4e3db9998 100644 --- a/dbgpt/core/awel/trigger/base.py +++ b/dbgpt/core/awel/trigger/base.py @@ -1,13 +1,22 @@ """Base class for all trigger classes.""" -from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, Generic +from typing import Any, Generic, Optional + +from dbgpt._private.pydantic import BaseModel, Field from ..operators.common_operator import TriggerOperator from ..task.base import OUT +class TriggerMetadata(BaseModel): + """Metadata for the trigger.""" + + trigger_type: Optional[str] = Field( + default=None, description="The type of the trigger" + ) + + class Trigger(TriggerOperator[OUT], ABC, Generic[OUT]): """Base class for all trigger classes. diff --git a/dbgpt/core/awel/trigger/http_trigger.py b/dbgpt/core/awel/trigger/http_trigger.py index 0f0931292..22e025c13 100644 --- a/dbgpt/core/awel/trigger/http_trigger.py +++ b/dbgpt/core/awel/trigger/http_trigger.py @@ -43,7 +43,7 @@ from ..operators.common_operator import MapOperator from ..util._typing_util import _parse_bool from ..util.http_util import join_paths -from .base import Trigger +from .base import Trigger, TriggerMetadata if TYPE_CHECKING: from fastapi import APIRouter, FastAPI @@ -82,6 +82,17 @@ def _default_streaming_predict_func(body: "CommonRequestType") -> bool: return _parse_bool(streaming) +class HttpTriggerMetadata(TriggerMetadata): + """Trigger metadata.""" + + path: str = Field(..., description="The path of the trigger") + methods: List[str] = Field(..., description="The methods of the trigger") + + trigger_type: Optional[str] = Field( + default="http", description="The type of the trigger" + ) + + class BaseHttpBody(BaseModel): """Http body. @@ -444,7 +455,7 @@ def register_to_app(self) -> bool: def mount_to_router( self, router: "APIRouter", global_prefix: Optional[str] = None - ) -> None: + ) -> HttpTriggerMetadata: """Mount the trigger to a router. Args: @@ -466,8 +477,11 @@ def mount_to_router( )(dynamic_route_function) logger.info(f"Mount http trigger success, path: {path}") + return HttpTriggerMetadata(path=path, methods=self._methods) - def mount_to_app(self, app: "FastAPI", global_prefix: Optional[str] = None) -> None: + def mount_to_app( + self, app: "FastAPI", global_prefix: Optional[str] = None + ) -> HttpTriggerMetadata: """Mount the trigger to a FastAPI app. TODO: The performance of this method is not good, need to be optimized. @@ -498,6 +512,7 @@ def mount_to_app(self, app: "FastAPI", global_prefix: Optional[str] = None) -> N app.openapi_schema = None app.middleware_stack = None logger.info(f"Mount http trigger success, path: {path}") + return HttpTriggerMetadata(path=path, methods=self._methods) def remove_from_app( self, app: "FastAPI", global_prefix: Optional[str] = None diff --git a/dbgpt/core/awel/trigger/trigger_manager.py b/dbgpt/core/awel/trigger/trigger_manager.py index 0db4a8bef..45b040147 100644 --- a/dbgpt/core/awel/trigger/trigger_manager.py +++ b/dbgpt/core/awel/trigger/trigger_manager.py @@ -2,15 +2,16 @@ After DB-GPT started, the trigger manager will be initialized and register all triggers """ + import logging from abc import ABC, abstractmethod from collections import defaultdict -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union from dbgpt.component import BaseComponent, ComponentType, SystemApp from ..util.http_util import join_paths -from .base import Trigger +from .base import Trigger, TriggerMetadata if TYPE_CHECKING: from fastapi import APIRouter @@ -23,7 +24,9 @@ class TriggerManager(ABC): """Base class for trigger manager.""" @abstractmethod - def register_trigger(self, trigger: Any, system_app: SystemApp) -> None: + def register_trigger( + self, trigger: Any, system_app: SystemApp + ) -> Optional[TriggerMetadata]: """Register a trigger to current manager.""" @abstractmethod @@ -65,10 +68,12 @@ def __init__( self._inited = False self._router_prefix = router_prefix self._router = router - self._trigger_map: Dict[str, Trigger] = {} + self._trigger_map: Dict[str, Tuple[Trigger, TriggerMetadata]] = {} self._router_tables: Dict[str, Set[str]] = defaultdict(set) - def register_trigger(self, trigger: Any, system_app: SystemApp) -> None: + def register_trigger( + self, trigger: Any, system_app: SystemApp + ) -> Optional[TriggerMetadata]: """Register a trigger to current manager.""" from .http_trigger import HttpTrigger @@ -86,13 +91,17 @@ def register_trigger(self, trigger: Any, system_app: SystemApp) -> None: if not app: raise ValueError("System app not initialized") # Mount to app, support dynamic route. - trigger.mount_to_app(app, self._router_prefix) + trigger_metadata = trigger.mount_to_app(app, self._router_prefix) else: - trigger.mount_to_router(self._router, self._router_prefix) - self._trigger_map[trigger_id] = trigger + trigger_metadata = trigger.mount_to_router( + self._router, self._router_prefix + ) + self._trigger_map[trigger_id] = (trigger, trigger_metadata) + return trigger_metadata except Exception as e: self._unregister_route_tables(path, methods) raise e + return None def unregister_trigger(self, trigger: Any, system_app: SystemApp) -> None: """Unregister a trigger to current manager.""" @@ -183,7 +192,9 @@ def init_app(self, system_app: SystemApp): if system_app and self.system_app.app: self._http_trigger = HttpTriggerManager() - def register_trigger(self, trigger: Any, system_app: SystemApp) -> None: + def register_trigger( + self, trigger: Any, system_app: SystemApp + ) -> Optional[TriggerMetadata]: """Register a trigger to current manager.""" from .http_trigger import HttpTrigger @@ -191,7 +202,9 @@ def register_trigger(self, trigger: Any, system_app: SystemApp) -> None: logger.info(f"Register trigger {trigger}") if not self._http_trigger: raise ValueError("Http trigger manager not initialized") - self._http_trigger.register_trigger(trigger, system_app) + return self._http_trigger.register_trigger(trigger, system_app) + else: + return None def unregister_trigger(self, trigger: Any, system_app: SystemApp) -> None: """Unregister a trigger to current manager.""" diff --git a/dbgpt/core/awel/util/chat_util.py b/dbgpt/core/awel/util/chat_util.py new file mode 100644 index 000000000..464e4ac7d --- /dev/null +++ b/dbgpt/core/awel/util/chat_util.py @@ -0,0 +1,323 @@ +"""The utility functions for chatting with the DAG task.""" + +import json +import traceback +from typing import Any, AsyncIterator, Dict, Optional + +from ...interface.llm import ModelInferenceMetrics, ModelOutput +from ...schema.api import ChatCompletionResponseStreamChoice +from ..operators.base import BaseOperator +from ..trigger.http_trigger import CommonLLMHttpResponseBody + + +def is_chat_flow_type(output_obj: Any, is_class: bool = False) -> bool: + """Check whether the output object is a chat flow type.""" + if is_class: + return output_obj in (str, CommonLLMHttpResponseBody, ModelOutput) + else: + chat_types = (str, CommonLLMHttpResponseBody) + return isinstance(output_obj, chat_types) + + +async def safe_chat_with_dag_task( + task: BaseOperator, request: Any, covert_to_str: bool = False +) -> ModelOutput: + """Chat with the DAG task. + + Args: + task (BaseOperator): The DAG task to be executed. + request (Any): The request to be passed to the DAG task. + covert_to_str (bool, optional): Whether to convert the output to string. + + Returns: + ModelOutput: The model output, the result is not incremental. + """ + try: + finish_reason = None + usage = None + metrics = None + error_code = 0 + text = "" + async for output in safe_chat_stream_with_dag_task( + task, request, False, covert_to_str=covert_to_str + ): + finish_reason = output.finish_reason + usage = output.usage + metrics = output.metrics + error_code = output.error_code + text = output.text + return ModelOutput( + error_code=error_code, + text=text, + metrics=metrics, + usage=usage, + finish_reason=finish_reason, + ) + except Exception as e: + return ModelOutput(error_code=1, text=str(e), incremental=False) + + +async def safe_chat_stream_with_dag_task( + task: BaseOperator, request: Any, incremental: bool, covert_to_str: bool = False +) -> AsyncIterator[ModelOutput]: + """Chat with the DAG task. + + This function is similar to `chat_stream_with_dag_task`, but it will catch the + exception and return the error message. + + Args: + task (BaseOperator): The DAG task to be executed. + request (Any): The request to be passed to the DAG task. + incremental (bool): Whether the output is incremental. + covert_to_str (bool, optional): Whether to convert the output to string. + + Yields: + ModelOutput: The model output. + """ + try: + async for output in chat_stream_with_dag_task( + task, request, incremental, covert_to_str=covert_to_str + ): + yield output + except Exception as e: + simple_error_msg = str(e) + if not simple_error_msg: + simple_error_msg = traceback.format_exc() + yield ModelOutput(error_code=1, text=simple_error_msg, incremental=incremental) + finally: + if task.streaming_operator and task.dag: + await task.dag._after_dag_end(task.current_event_loop_task_id) + + +def _is_sse_output(task: BaseOperator) -> bool: + """Check whether the DAG task is a server-sent event output. + + Args: + task (BaseOperator): The DAG task. + + Returns: + bool: Whether the DAG task is a server-sent event output. + """ + return task.output_format is not None and task.output_format.upper() == "SSE" + + +async def chat_stream_with_dag_task( + task: BaseOperator, request: Any, incremental: bool, covert_to_str: bool = False +) -> AsyncIterator[ModelOutput]: + """Chat with the DAG task. + + Args: + task (BaseOperator): The DAG task to be executed. + request (Any): The request to be passed to the DAG task. + incremental (bool): Whether the output is incremental. + covert_to_str (bool, optional): Whether to convert the output to string. + + Yields: + ModelOutput: The model output. + """ + is_sse = _is_sse_output(task) + if not task.streaming_operator: + try: + result = await task.call(request) + model_output = parse_single_output( + result, is_sse, covert_to_str=covert_to_str + ) + model_output.incremental = incremental + yield model_output + except Exception as e: + simple_error_msg = str(e) + if not simple_error_msg: + simple_error_msg = traceback.format_exc() + yield ModelOutput( + error_code=1, text=simple_error_msg, incremental=incremental + ) + else: + from dbgpt.model.utils.chatgpt_utils import OpenAIStreamingOutputOperator + + if OpenAIStreamingOutputOperator and isinstance( + task, OpenAIStreamingOutputOperator + ): + full_text = "" + async for output in await task.call_stream(request): + model_output = parse_openai_output(output) + # The output of the OpenAI streaming API is incremental + full_text += model_output.text + model_output.incremental = incremental + model_output.text = model_output.text if incremental else full_text + yield model_output + if not model_output.success: + break + else: + full_text = "" + previous_text = "" + async for output in await task.call_stream(request): + model_output = parse_single_output( + output, is_sse, covert_to_str=covert_to_str + ) + model_output.incremental = incremental + if task.incremental_output: + # Output is incremental, append the text + full_text += model_output.text + else: + # Output is not incremental, last output is the full text + full_text = model_output.text + if not incremental: + # Return the full text + model_output.text = full_text + else: + # Return the incremental text + delta_text = full_text[len(previous_text) :] + previous_text = ( + full_text + if len(full_text) > len(previous_text) + else previous_text + ) + model_output.text = delta_text + yield model_output + if not model_output.success: + break + + +def parse_single_output( + output: Any, is_sse: bool, covert_to_str: bool = False +) -> ModelOutput: + """Parse the single output. + + Args: + output (Any): The output to parse. + is_sse (bool): Whether the output is in SSE format. + covert_to_str (bool, optional): Whether to convert the output to string. + Defaults to False. + + Returns: + ModelOutput: The parsed output. + """ + finish_reason: Optional[str] = None + usage: Optional[Dict[str, Any]] = None + metrics: Optional[ModelInferenceMetrics] = None + + if output is None: + error_code = 1 + text = "The output is None!" + elif isinstance(output, str): + if is_sse: + sse_output = parse_sse_data(output) + if sse_output is None: + error_code = 1 + text = "The output is not a SSE format" + else: + error_code = 0 + text = sse_output + else: + error_code = 0 + text = output + elif isinstance(output, ModelOutput): + error_code = output.error_code + text = output.text + finish_reason = output.finish_reason + usage = output.usage + metrics = output.metrics + elif isinstance(output, CommonLLMHttpResponseBody): + error_code = output.error_code + text = output.text + elif isinstance(output, dict): + error_code = 0 + text = json.dumps(output, ensure_ascii=False) + elif covert_to_str: + error_code = 0 + text = str(output) + else: + error_code = 1 + text = f"The output is not a valid format({type(output)})" + return ModelOutput( + error_code=error_code, + text=text, + finish_reason=finish_reason, + usage=usage, + metrics=metrics, + ) + + +def parse_openai_output(output: Any) -> ModelOutput: + """Parse the OpenAI output. + + Args: + output (Any): The output to parse. It must be a stream format. + + Returns: + ModelOutput: The parsed output. + """ + text = "" + if not isinstance(output, str): + return ModelOutput( + error_code=1, + text="The output is not a stream format", + ) + if output.strip() == "data: [DONE]" or output.strip() == "data:[DONE]": + return ModelOutput(error_code=0, text="") + if not output.startswith("data:"): + return ModelOutput( + error_code=1, + text="The output is not a stream format", + ) + + sse_output = parse_sse_data(output) + if sse_output is None: + return ModelOutput(error_code=1, text="The output is not a SSE format") + json_data = sse_output.strip() + try: + dict_data = json.loads(json_data) + except Exception as e: + return ModelOutput( + error_code=1, + text=f"Invalid JSON data: {json_data}, {e}", + ) + if "choices" not in dict_data: + return ModelOutput( + error_code=1, + text=dict_data.get("text", "Unknown error"), + ) + choices = dict_data["choices"] + finish_reason: Optional[str] = None + if choices: + choice = choices[0] + delta_data = ChatCompletionResponseStreamChoice(**choice) + if delta_data.delta.content: + text = delta_data.delta.content + finish_reason = delta_data.finish_reason + return ModelOutput(error_code=0, text=text, finish_reason=finish_reason) + + +def parse_sse_data(output: str) -> Optional[str]: + r"""Parse the SSE data. + + Just keep the data part. + + Examples: + .. code-block:: python + + from dbgpt.core.awel.util.chat_util import parse_sse_data + + assert parse_sse_data("data: [DONE]") == "[DONE]" + assert parse_sse_data("data:[DONE]") == "[DONE]" + assert parse_sse_data("data: Hello") == "Hello" + assert parse_sse_data("data: Hello\n") == "Hello" + assert parse_sse_data("data: Hello\r\n") == "Hello" + assert parse_sse_data("data: Hi, what's up?") == "Hi, what's up?" + + Args: + output (str): The output. + + Returns: + Optional[str]: The parsed data. + """ + if output.startswith("data:"): + output = output.strip() + if output.startswith("data: "): + output = output[6:] + else: + output = output[5:] + + return output + else: + return None diff --git a/dbgpt/serve/flow/service/service.py b/dbgpt/serve/flow/service/service.py index 8cc913bdf..e96230dd7 100644 --- a/dbgpt/serve/flow/service/service.py +++ b/dbgpt/serve/flow/service/service.py @@ -1,19 +1,13 @@ import json import logging -import traceback -from typing import Any, AsyncIterator, List, Optional, cast +from typing import AsyncIterator, List, Optional, cast import schedule from fastapi import HTTPException from dbgpt._private.pydantic import model_to_json from dbgpt.component import SystemApp -from dbgpt.core.awel import ( - DAG, - BaseOperator, - CommonLLMHttpRequestBody, - CommonLLMHttpResponseBody, -) +from dbgpt.core.awel import DAG, BaseOperator, CommonLLMHttpRequestBody from dbgpt.core.awel.dag.dag_manager import DAGManager from dbgpt.core.awel.flow.flow_factory import ( FlowCategory, @@ -22,10 +16,13 @@ fill_flow_panel, ) from dbgpt.core.awel.trigger.http_trigger import CommonLLMHttpTrigger +from dbgpt.core.awel.util.chat_util import ( + is_chat_flow_type, + safe_chat_stream_with_dag_task, + safe_chat_with_dag_task, +) from dbgpt.core.interface.llm import ModelOutput from dbgpt.core.schema.api import ( - ChatCompletionResponse, - ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, ChatCompletionStreamResponse, DeltaMessage, @@ -333,6 +330,11 @@ def get(self, request: QUERY_SPEC) -> Optional[ServerResponse]: flow = self.dao.get_one(query_request) if flow: fill_flow_panel(flow) + metadata = self.dag_manager.get_dag_metadata( + flow.dag_id, alias_name=flow.uid + ) + if metadata: + flow.metadata = metadata.to_dict() return flow def delete(self, uid: str) -> Optional[ServerResponse]: @@ -390,7 +392,14 @@ def get_list_by_page( Returns: List[ServerResponse]: The response """ - return self.dao.get_list_page(request, page, page_size) + page_result = self.dao.get_list_page(request, page, page_size) + for item in page_result.items: + metadata = self.dag_manager.get_dag_metadata( + item.dag_id, alias_name=item.uid + ) + if metadata: + item.metadata = metadata.to_dict() + return page_result async def chat_stream_flow_str( self, flow_uid: str, request: CommonLLMHttpRequestBody @@ -463,7 +472,7 @@ async def safe_chat_flow( incremental = request.incremental try: task = await self._get_callable_task(flow_uid) - return await _safe_chat_with_dag_task(task, request) + return await safe_chat_with_dag_task(task, request) except HTTPException as e: return ModelOutput(error_code=1, text=e.detail, incremental=incremental) except Exception as e: @@ -484,7 +493,7 @@ async def safe_chat_stream_flow( incremental = request.incremental try: task = await self._get_callable_task(flow_uid) - async for output in _safe_chat_stream_with_dag_task( + async for output in safe_chat_stream_with_dag_task( task, request, incremental ): yield output @@ -556,220 +565,7 @@ def _parse_flow_category(self, dag: DAG) -> FlowCategory: output = leaf_node.metadata.outputs[0] try: real_class = _get_type_cls(output.type_cls) - if common_http_trigger and _is_chat_flow_type(real_class, is_class=True): + if common_http_trigger and is_chat_flow_type(real_class, is_class=True): return FlowCategory.CHAT_FLOW except Exception: return FlowCategory.COMMON - - -def _is_chat_flow_type(output_obj: Any, is_class: bool = False) -> bool: - if is_class: - return ( - output_obj == str - or output_obj == CommonLLMHttpResponseBody - or output_obj == ModelOutput - ) - else: - chat_types = (str, CommonLLMHttpResponseBody) - return isinstance(output_obj, chat_types) - - -async def _safe_chat_with_dag_task(task: BaseOperator, request: Any) -> ModelOutput: - """Chat with the DAG task.""" - try: - finish_reason = None - usage = None - metrics = None - error_code = 0 - text = "" - async for output in _safe_chat_stream_with_dag_task(task, request, False): - finish_reason = output.finish_reason - usage = output.usage - metrics = output.metrics - error_code = output.error_code - text = output.text - return ModelOutput( - error_code=error_code, - text=text, - metrics=metrics, - usage=usage, - finish_reason=finish_reason, - ) - except Exception as e: - return ModelOutput(error_code=1, text=str(e), incremental=False) - - -async def _safe_chat_stream_with_dag_task( - task: BaseOperator, - request: Any, - incremental: bool, -) -> AsyncIterator[ModelOutput]: - """Chat with the DAG task.""" - try: - async for output in _chat_stream_with_dag_task(task, request, incremental): - yield output - except Exception as e: - yield ModelOutput(error_code=1, text=str(e), incremental=incremental) - finally: - if task.streaming_operator: - if task.dag: - await task.dag._after_dag_end(task.current_event_loop_task_id) - - -async def _chat_stream_with_dag_task( - task: BaseOperator, - request: Any, - incremental: bool, -) -> AsyncIterator[ModelOutput]: - """Chat with the DAG task.""" - is_sse = task.output_format and task.output_format.upper() == "SSE" - if not task.streaming_operator: - try: - result = await task.call(request) - model_output = _parse_single_output(result, is_sse) - model_output.incremental = incremental - yield model_output - except Exception as e: - yield ModelOutput(error_code=1, text=str(e), incremental=incremental) - else: - from dbgpt.model.utils.chatgpt_utils import OpenAIStreamingOutputOperator - - if OpenAIStreamingOutputOperator and isinstance( - task, OpenAIStreamingOutputOperator - ): - full_text = "" - async for output in await task.call_stream(request): - model_output = _parse_openai_output(output) - # The output of the OpenAI streaming API is incremental - full_text += model_output.text - model_output.incremental = incremental - model_output.text = model_output.text if incremental else full_text - yield model_output - if not model_output.success: - break - else: - full_text = "" - previous_text = "" - async for output in await task.call_stream(request): - model_output = _parse_single_output(output, is_sse) - model_output.incremental = incremental - if task.incremental_output: - # Output is incremental, append the text - full_text += model_output.text - else: - # Output is not incremental, last output is the full text - full_text = model_output.text - if not incremental: - # Return the full text - model_output.text = full_text - else: - # Return the incremental text - delta_text = full_text[len(previous_text) :] - previous_text = ( - full_text - if len(full_text) > len(previous_text) - else previous_text - ) - model_output.text = delta_text - yield model_output - if not model_output.success: - break - - -def _parse_single_output(output: Any, is_sse: bool) -> ModelOutput: - """Parse the single output.""" - finish_reason = None - usage = None - metrics = None - if output is None: - error_code = 1 - text = "The output is None!" - elif isinstance(output, str): - if is_sse: - sse_output = _parse_sse_data(output) - if sse_output is None: - error_code = 1 - text = "The output is not a SSE format" - else: - error_code = 0 - text = sse_output - else: - error_code = 0 - text = output - elif isinstance(output, ModelOutput): - error_code = output.error_code - text = output.text - finish_reason = output.finish_reason - usage = output.usage - metrics = output.metrics - elif isinstance(output, CommonLLMHttpResponseBody): - error_code = output.error_code - text = output.text - elif isinstance(output, dict): - error_code = 0 - text = json.dumps(output, ensure_ascii=False) - else: - error_code = 1 - text = f"The output is not a valid format({type(output)})" - return ModelOutput( - error_code=error_code, - text=text, - finish_reason=finish_reason, - usage=usage, - metrics=metrics, - ) - - -def _parse_openai_output(output: Any) -> ModelOutput: - """Parse the OpenAI output.""" - text = "" - if not isinstance(output, str): - return ModelOutput( - error_code=1, - text="The output is not a stream format", - ) - if output.strip() == "data: [DONE]" or output.strip() == "data:[DONE]": - return ModelOutput(error_code=0, text="") - if not output.startswith("data:"): - return ModelOutput( - error_code=1, - text="The output is not a stream format", - ) - - sse_output = _parse_sse_data(output) - if sse_output is None: - return ModelOutput(error_code=1, text="The output is not a SSE format") - json_data = sse_output.strip() - try: - dict_data = json.loads(json_data) - except Exception as e: - return ModelOutput( - error_code=1, - text=f"Invalid JSON data: {json_data}, {e}", - ) - if "choices" not in dict_data: - return ModelOutput( - error_code=1, - text=dict_data.get("text", "Unknown error"), - ) - choices = dict_data["choices"] - finish_reason: Optional[str] = None - if choices: - choice = choices[0] - delta_data = ChatCompletionResponseStreamChoice(**choice) - if delta_data.delta.content: - text = delta_data.delta.content - finish_reason = delta_data.finish_reason - return ModelOutput(error_code=0, text=text, finish_reason=finish_reason) - - -def _parse_sse_data(output: str) -> Optional[str]: - if output.startswith("data:"): - if output.startswith("data: "): - output = output[6:] - else: - output = output[5:] - - return output - else: - return None diff --git a/dbgpt/util/dbgpts/loader.py b/dbgpt/util/dbgpts/loader.py index eaf12f479..8545ad067 100644 --- a/dbgpt/util/dbgpts/loader.py +++ b/dbgpt/util/dbgpts/loader.py @@ -320,6 +320,51 @@ def _load_package_from_path(path: str): return parsed_packages +def _load_flow_package_from_path(name: str, path: str = INSTALL_DIR) -> FlowPackage: + raw_packages = _load_installed_package(path) + new_name = name.replace("_", "-") + packages = [p for p in raw_packages if p.package == name or p.name == name] + if not packages: + packages = [ + p for p in raw_packages if p.package == new_name or p.name == new_name + ] + if not packages: + raise ValueError(f"Can't find the package {name} or {new_name}") + flow_package = _parse_package_metadata(packages[0]) + if flow_package.package_type != "flow": + raise ValueError(f"Unsupported package type: {flow_package.package_type}") + return cast(FlowPackage, flow_package) + + +def _flow_package_to_flow_panel(package: FlowPackage) -> FlowPanel: + dict_value = { + "name": package.name, + "label": package.label, + "version": package.version, + "editable": False, + "description": package.description, + "source": package.repo, + "define_type": "json", + } + if isinstance(package, FlowJsonPackage): + dict_value["flow_data"] = package.read_definition_json() + elif isinstance(package, FlowPythonPackage): + dict_value["flow_data"] = { + "nodes": [], + "edges": [], + "viewport": { + "x": 213, + "y": 269, + "zoom": 0, + }, + } + dict_value["flow_dag"] = package.dag + dict_value["define_type"] = "python" + else: + raise ValueError(f"Unsupported package type: {package}") + return FlowPanel(**dict_value) + + class DBGPTsLoader(BaseComponent): """The loader of the dbgpts packages""" @@ -373,32 +418,8 @@ def get_flows(self) -> List[FlowPanel]: if package.package_type != "flow": continue package = cast(FlowPackage, package) - dict_value = { - "name": package.name, - "label": package.label, - "version": package.version, - "editable": False, - "description": package.description, - "source": package.repo, - "define_type": "json", - } - if isinstance(package, FlowJsonPackage): - dict_value["flow_data"] = package.read_definition_json() - elif isinstance(package, FlowPythonPackage): - dict_value["flow_data"] = { - "nodes": [], - "edges": [], - "viewport": { - "x": 213, - "y": 269, - "zoom": 0, - }, - } - dict_value["flow_dag"] = package.dag - dict_value["define_type"] = "python" - else: - raise ValueError(f"Unsupported package type: {package}") - panels.append(FlowPanel(**dict_value)) + flow_panel = _flow_package_to_flow_panel(package) + panels.append(flow_panel) return panels def _register_packages(self, package: BasePackage): diff --git a/docs/docs/dbgpts/introduction.md b/docs/docs/dbgpts/introduction.md new file mode 100644 index 000000000..1063a7297 --- /dev/null +++ b/docs/docs/dbgpts/introduction.md @@ -0,0 +1,280 @@ +# dbgpts + +[dbgpts](https://github.com/eosphoros-ai/dbgpts) contains some data apps, AWEL operators, AWEL workflows, agents and resources +which build upon the DB-GPT. + +## Introduction + +### Why We Need `dbgpts` + +In a production-level LLM's application, there are many components that need to be +integrated, and you want to start your research and creativity quickly by using the +existing components. + +At the same time, we hope that the core components of DB-GPT keep simple and easy to +maintain, and some complex components can be developed in the form of plugins. + +So, we need a plugin system to extend the capabilities of DB-GPT, and `dbgpts` is the +plugin system or a part of ecosystem of DB-GPT. + +### What Is `dbgpts` + +There are some concepts in `dbgpts`: +- `app`: It includes data apps, AWEL operators, AWEL workflows, agents and resources, sometimes, we +call it `dbgpts` app or `dbgpts` package. +- `repo`: It is a repository of `dbgpts` apps, you can install a `dbgpts` app from a `dbgpts` repo, +the default `dbgpts` repo is [eosphoros-ai/dbgpts](https://github.com/eosphoros-ai/dbgpts), you can +also create your own `dbgpts` repo or use other's `dbgpts` repo. + +### How To Run `dbgpts` + +1. When you install a `dbgpts` app, it will be loaded to your DB-GPT webserver automatically, +and you can use it in the DB-GPT webserver or trigger it by command line `dbgpt run ...`. +2. You can also run a `dbgpts` app as a command line tool, you can use it in your terminal by +`dbgpt app run ...` with `--local` option, it will run the app in your local environment. + +## Quick Start + +Let's install a `dbgpts` package named [awel-flow-simple-streaming-chat](https://github.com/eosphoros-ai/dbgpts/tree/main/workflow/awel-flow-simple-streaming-chat) + +```bash +dbgpt app install awel-flow-simple-streaming-chat -U +``` + +### Run The App Locally + +Then, you can run the app in your terminal: + +```bash +dbgpt run flow --local chat \ +--name awel-flow-simple-streaming-chat \ +--model "gpt-3.5-turbo" \ +--messages "hello" \ +--stream +``` +- `dbgpt run flow`: Means you want to run a AWEL workflow. +- `--local`: Means you want to run the workflow in your local environment without +starting the DB-GPT webserver, it will find the `app` installed in your local +environment, then run it, also, you can use `--file` to specify the python file. +- `--name`: The name of the app. +- `--model`: The LLM model you want to use, `awel-flow-simple-streaming-chat` will +use OpenAI LLM by default if you run it with `--local`. +- `--messages`: The messages you want to send to the LLM. +- `--stream`: Means you want to run the workflow in streaming mode. + +The output will be like this: + +```bash +You: hello +[~info] Chat stream started +[~info] JSON data: {"model": "gpt-3.5-turbo", "messages": "hello", "stream": true} +Bot: +Hello! How can I assist you today? +🎉 Chat stream finished, timecost: 1.12 s +``` + +### Run The App In DB-GPT Webserver + +After you install the `awel-flow-simple-streaming-chat` app, you can run it in the DB-GPT webserver. +Also, you can use the `dbgpt` command line tool to trigger the app. + +```bash +dbgpt run flow chat \ +--name awel-flow-simple-streaming-chat \ +--model "chatgpt_proxyllm" \ +--messages "hello" \ +--stream +``` + +You just remove the `--local` option, then the command will connect to the DB-GPT webserver and run the app. +And you should modify the `--model` option to your model name in the DB-GPT webserver. + +The output will be like this: + +```bash +You: hello +[~info] Chat stream started +[~info] JSON data: {"model": "chatgpt_proxyllm", "messages": "hello", "stream": true, "chat_param": "1ecd35d4-a60a-420b-8943-8fc44f7f054a", "chat_mode": "chat_flow"} +Bot: +Hello! How can I assist you today? +🎉 Chat stream finished, timecost: 0.98 s +``` + +## Run The App With `command` Mode + +In previous examples, we run the app in `chat` mode, but not all `dbgpts` apps support `chat` mode, +some apps support `command` mode, you can run the app with `dbgpt run flow cmd` command. + +### Run The App Locally + +```bash +dbgpt run flow --local cmd \ +--name awel-flow-simple-streaming-chat \ +-d ' +{ + "model": "gpt-3.5-turbo", + "messages": "hello", + "stream": true +} +' +``` + +We replace the `chat` mode with `cmd` mode, and use `-d` option to specify the data in JSON format. + +The output will be like this: + +```bash +[~info] Flow started +[~info] JSON data: {"model": "gpt-3.5-turbo", "messages": "hello", "stream": true} +Command output: +Hello! How can I assist you today? +🎉 Flow finished, timecost: 1.35 s +``` + +### Run The App In DB-GPT Webserver + +Just remove the `--local` option, then the command will connect to the DB-GPT webserver and run the app. + +```bash +dbgpt run flow cmd \ +--name awel-flow-simple-streaming-chat \ +-d ' +{ + "model": "chatgpt_proxyllm", + "messages": "hello", + "stream": true +} +' +``` + +The output will be like this: + +```bash +[~info] Flow started +[~info] JSON data: {"model": "chatgpt_proxyllm", "messages": "hello", "stream": true} +Command output: +Hello! How can I assist you today? +🎉 Flow finished, timecost: 1.09 s +``` + +## `chat` Mode vs `command` Mode + +In short, `chat` mode is used for chat applications, and `command` mode is used to +trigger the app with a command. + +For example, you want to load your documents to the DB-GPT, you can use `command` mode +to trigger the app to load the documents, it always runs once and the result will be +returned. + +And `chat` mode is a special case of `command` mode, it provides a chat interface to +the user, and you can chat with the LLM in an interactive way. + + +## Run You App With Python Script + +If you run app locally, it will find the app which is installed in your local environment, +also, you can run the app by providing the python file. + +Let's create a python file named `simple_chat_app.py`: + +```python +import os +from dbgpt._private.pydantic import BaseModel, Field +from dbgpt.core import ModelMessage, ModelRequest +from dbgpt.core.awel import DAG, HttpTrigger, MapOperator +from dbgpt.model.proxy import OpenAILLMClient +from dbgpt.model.operators import LLMOperator + + +class TriggerReqBody(BaseModel): + model: str = Field(..., description="Model name") + messages: str = Field(..., description="User input") + + +class RequestHandleOperator(MapOperator[TriggerReqBody, ModelRequest]): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def map(self, input_value: TriggerReqBody) -> ModelRequest: + messages = [ModelMessage.build_human_message(input_value.messages)] + return ModelRequest.build_request(input_value.model, messages) + + +with DAG("dbgpts_simple_chat_app") as dag: + # Receive http request and trigger dag to run. + trigger = HttpTrigger( + "/dbgpts/simple_chat_app", methods="POST", request_body=TriggerReqBody + ) + llm_client = OpenAILLMClient( + model_alias="gpt-3.5-turbo", # or other models, eg. "gpt-4o" + api_base=os.getenv("OPENAI_API_BASE"), + api_key=os.getenv("OPENAI_API_KEY"), + ) + request_handle_task = RequestHandleOperator() + llm_task = LLMOperator(llm_client=llm_client) + model_parse_task = MapOperator(lambda out: out.text) + trigger >> request_handle_task >> llm_task >> model_parse_task +``` + +Then you can run the app by providing the python file: + +```bash +dbgpt run flow --local --file simple_chat_app.py \ +chat \ +--name dbgpts_simple_chat_app \ +--model "gpt-3.5-turbo" \ +--messages "hello" +``` + +The output will be like this: + +```bash +You: hello +[~info] Chat started +[~info] JSON data: {"model": "gpt-3.5-turbo", "messages": "hello", "stream": false} +Bot: +Hello! How can I assist you today? + +🎉 Chat stream finished, timecost: 1.06 s +``` + +And you can run previous examples with `command` mode. + +```bash +dbgpt run flow --local --file simple_chat_app.py \ +cmd \ +--name dbgpts_simple_chat_app \ +-d ' +{ + "model": "gpt-3.5-turbo", + "messages": "hello" +}' +``` + +The output will be like this: + +```bash +[~info] Flow started +[~info] JSON data: {"model": "gpt-3.5-turbo", "messages": "hello"} +Command output: +Hello! How can I assist you today? +🎉 Flow finished, timecost: 1.04 s +``` + +## Show Your App In DB-GPT Webserver + +When you install the workflow, you can see the workflow in the DB-GPT webserver, you can open +the **AWEL Flow** page, then you can see the workflow named `awel_flow_simple_streaming_chat`. + +

+ +

+ +Then you can click the `edit` button to see the details of the workflow. +

+ +

+ +Note: Not all workflows support editing, there are two types of workflows according to the +definition type: `json` and `python`, the `json` type workflow can be edited in the DB-GPT, +We will show you more details in the next sections. diff --git a/docs/sidebars.js b/docs/sidebars.js index 2e26dfe5f..75c536dec 100755 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -621,6 +621,16 @@ const sidebars = { } ], }, + { + type: "category", + label: "dbgpts", + items: [ + { + type: "doc", + id: 'dbgpts/introduction', + } + ] + } ], }, diff --git a/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_1.png b/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_1.png new file mode 100644 index 000000000..4b58ee1fb Binary files /dev/null and b/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_1.png differ diff --git a/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_2.png b/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_2.png new file mode 100644 index 000000000..8d023a067 Binary files /dev/null and b/docs/static/img/dbgpts/awel_flow_simple_streaming_chat_2.png differ diff --git a/examples/awel/simple_chat_dag_example.py b/examples/awel/simple_chat_dag_example.py index bb0fc7242..2848e2ad1 100644 --- a/examples/awel/simple_chat_dag_example.py +++ b/examples/awel/simple_chat_dag_example.py @@ -14,6 +14,7 @@ "user_input": "hello" }' """ + from dbgpt._private.pydantic import BaseModel, Field from dbgpt.core import ModelMessage, ModelRequest from dbgpt.core.awel import DAG, HttpTrigger, MapOperator @@ -35,7 +36,7 @@ async def map(self, input_value: TriggerReqBody) -> ModelRequest: return ModelRequest.build_request(input_value.model, messages) -with DAG("dbgpt_awel_simple_dag_example") as dag: +with DAG("dbgpt_awel_simple_dag_example", tags={"label": "example"}) as dag: # Receive http request and trigger dag to run. trigger = HttpTrigger( "/examples/simple_chat", methods="POST", request_body=TriggerReqBody