Skip to content

Guide: Synchronous vs. Streaming Responses

Rob Royce edited this page Sep 2, 2024 · 5 revisions

Note

Starting in version v1.0.6, the ROSA class provides two primary methods for interacting with the agent: invoke and astream. These methods allow you to send queries to the agent and receive responses, with different behaviors for synchronous and asynchronous operations.

Synchronous Responses with invoke

The invoke method provides a synchronous way to interact with the ROSA agent. It processes the query and returns the complete response in a single operation.

Usage:

rosa = ROSA(ros_version=2, llm=your_llm_instance)
response = rosa.invoke("What is the current status of the robot?")
print(response)

Features:

  • Synchronous operation: Waits for the complete response before returning.
  • Token usage tracking: If enabled, provides information about token consumption.
  • Chat history: Automatically updates the chat history with the query and response.
  • Error handling: Catches and returns any errors that occur during processing.

Asynchronous Streaming with astream

The astream method offers an asynchronous, streaming interface to interact with the ROSA agent. It yields events as they occur, allowing for real-time processing of the agent's response.

Usage:

rosa = ROSA(ros_version=2, llm=your_llm_instance, streaming=True)
async for event in rosa.astream("Plan a path to the nearest charging station"):
    if event['type'] == 'token':
        print(event['content'], end='', flush=True)
    elif event['type'] in ['tool_start', 'tool_end']:
        print(f"\n{event['type']}: {event['name']}")
    elif event['type'] == 'final':
        print(f"\nFinal output: {event['content']}")
    elif event['type'] == 'error':
        print(f"\nError: {event['content']}")

Important

You must set streaming=True when creating the ROSA instance in order to use astream. It is set to False by default and will raise an error if you try to use the astream method when streaming=False.

Features:

  • Asynchronous operation: Yields events as they occur, enabling real-time processing.
  • Streaming response: Provides incremental updates as the agent generates its response.
  • Tool usage visibility: Notifies when tools are started and completed.
  • Final output: Delivers the complete response at the end of the stream.
  • Error handling: Yields error events if issues occur during processing.

Event Types:

  • token: Individual tokens of the generated response.
  • tool_start: Indicates the beginning of a tool's execution.
  • tool_end: Signals the completion of a tool's execution.
  • final: Provides the complete, final response from the agent.
  • error: Indicates any errors that occurred during processing.

Choosing Between invoke and astream

  • Use invoke when:

    • You need a simple, synchronous interaction.
    • You want to track token usage easily.
    • Real-time updates are not necessary.
  • Use astream when:

    • You want to provide a responsive, real-time interface.
    • You need to process or display incremental results.
    • You're building an application that benefits from streaming responses.

Note

To use astream, ensure that you initialize the ROSA instance with streaming=True.

By leveraging these methods, you can create flexible and powerful interactions with your ROS-based systems through natural language queries.

Example Usage in TurtleAgent

The TurtleAgent class in turtle_agent.py demonstrates how to use both synchronous and streaming responses with ROSA. Here's an overview of its implementation:

  1. Initialization:

    class TurtleAgent(ROSA):
        def __init__(self, streaming: bool = False, verbose: bool = True):
            # ... initialization code ...
            super().__init__(
                ros_version=1,
                llm=self.__llm,
                tools=[cool_turtle_tool, blast_off],
                tool_packages=[turtle_tools],
                blacklist=self.__blacklist,
                prompts=self.__prompts,
                verbose=verbose,
                accumulate_chat_history=True,
                streaming=streaming,
            )
  2. Handling user input:

    async def run(self):
        # ... setup code ...
        while True:
            input = self.get_input("> ")
            if input == "exit":
                break
            elif input in self.command_handler:
                await self.command_handler[input]()
            else:
                await self.submit(input)
  3. Submitting queries:

    async def submit(self, query: str):
        if self.__streaming:
            await self.stream_response(query)
        else:
            self.print_response(query)
  4. Synchronous response handling:

    def print_response(self, query: str):
        response = self.invoke(query)
        # ... code to display response ...
  5. Asynchronous streaming response handling:

    async def stream_response(self, query: str):
        async for event in self.astream(query):
            # ... code to handle different event types ...

This implementation showcases how to:

  • Initialize ROSA with custom tools and settings
  • Handle user input in a loop
  • Use both synchronous (invoke) and asynchronous streaming (astream) methods
  • Process and display streaming events in real-time
  • Provide additional features like command handling and example queries

By following this example, you can create your own interactive agents that leverage ROSA's capabilities for both synchronous and streaming interactions with ROS systems.