-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: simple agent #450
base: development
Are you sure you want to change the base?
feat: simple agent #450
Conversation
feat: generic callback for ConfigurableROS2TopicAPI feat: langchain HRIConnector callback
@coderabbitai full review |
WalkthroughThis pull request introduces new agent and ROS 2 communication components. It adds a standalone ROS 2 agent script and several new modules in the agent framework, including a language-based simple agent and a callback handler (HRICallbackHandler) for token processing. The changes also update the tool runner’s logger initialization, add a boolean method to the HRIMessage class, and enhance ROS 2 topic handling by queuing messages with a generic callback. Additionally, the signature of a key method in the ROS 2 connector is modified to assign a default message author. Changes
Sequence Diagram(s)sequenceDiagram
participant Main as Main Script
participant ROS2 as rclpy
participant Connector as ROS2HRIConnector
participant Agent as SimpleAgent
Main->>ROS2: rclpy.init()
Main->>Connector: Create connector (set targets & sources)
Main->>Agent: Instantiate SimpleAgent with connector
Agent->>Connector: receive_all_connectors() to collect messages
Agent->>Agent: _reduce_messages() and process via LLM/HRICallbackHandler
Main->>ROS2: rclpy.shutdown()
sequenceDiagram
participant LLM as Language Model
participant Callback as HRICallbackHandler
participant Target as HRIConnector Target
LLM->>Callback: on_llm_new_token(token)
Callback->>Callback: Aggregate token (if aggregation enabled)
alt Buffer full or splitting condition met
Callback->>Target: _send_all_targets(token)
Callback->>Callback: Clear buffer
else Immediate dispatch
Callback->>Target: _send_all_targets(token)
end
LLM->>Callback: on_llm_end(response)
Callback->>Target: Send any remaining buffered tokens
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (11)
src/rai_core/rai/agents/langchain/__init__.py (1)
1-1
: Update the copyright year.The copyright year is set to 2025, but it should be 2024 since we're still in 2024.
-# Copyright (C) 2025 Robotec.AI +# Copyright (C) 2024 Robotec.AIexamples/ros2_simple_agent.py (3)
1-1
: Update the copyright year.The copyright year is set to 2025, but it should be 2024 since we're still in 2024.
-# Copyright (C) 2025 Robotec.AI +# Copyright (C) 2024 Robotec.AI
12-12
: Fix typo in the license text.There's a typo in the license text: "goveself.rning" instead of "governing".
-# See the License for the specific language goveself.rning permissions and +# See the License for the specific language governing permissions and
20-25
: Add error handling to themain()
function.The current implementation lacks error handling, which could lead to unhandled exceptions if there are issues with ROS 2 initialization, connector creation, or agent execution.
def main(): - rclpy.init() - connector = ROS2HRIConnector(targets=["/to_human"], sources=["/from_human"]) - agent = SimpleAgent(connectors={"ros2": connector}) # type: ignore - agent.run() - rclpy.shutdown() + try: + rclpy.init() + connector = ROS2HRIConnector(targets=["/to_human"], sources=["/from_human"]) + agent = SimpleAgent(connectors={"ros2": connector}) # type: ignore + agent.run() + except Exception as e: + print(f"Error occurred during agent execution: {e}") + finally: + rclpy.shutdown()src/rai_core/rai/agents/simple_agent.py (3)
40-52
: Consider adding a termination mechanism to the infinite loop.The
run
method implements an infinite loop without any apparent way to terminate it. Consider adding a condition to break out of the loop when needed (e.g., a flag, signal handler, or other termination mechanism).- def run(self): - while True: + def run(self, should_continue=lambda: True): + while should_continue(): received_messages = {} try: received_messages = self.receive_all_connectors()
52-52
: Consider making the sleep duration configurable.The hardcoded 0.3 second sleep duration might not be optimal for all use cases. Consider making this value configurable.
- time.sleep(0.3) + time.sleep(self.polling_interval)And update the constructor:
def __init__( self, connectors: dict[str, HRIConnector[HRIMessage]], llm: Optional[BaseChatModel] = None, state: Optional[SimpleAgentState] = None, + polling_interval: float = 0.3, ): super().__init__(connectors=connectors) self.logger = logging.getLogger(__name__) self.agent = create_simple_agent(llm=llm) self.callback = HRICallbackHandler(connectors=connectors, aggregate_chunks=True) self.state = state or SimpleAgentState(messages=[]) + self.polling_interval = polling_interval
65-75
: Consider improving message reduction logic.The current implementation concatenates text from all sources with simple newlines, which might not preserve the context or structure of the original messages. Consider a more structured approach for combining messages that better preserves their original context.
src/rai_core/rai/agents/langchain/simple_agent.py (1)
67-110
: Good agent creation implementation with tool integration.The
create_simple_agent
function is well-implemented with proper handling of optional parameters, sensible defaults, and good tool integration. The state graph approach is elegant.Consider adding a brief comment explaining what
tools_condition
does or how it determines which edge to follow for better code maintainability.graph.add_conditional_edges( "llm", tools_condition, + # tools_condition routes to "tools" if AIMessage contains tool calls, otherwise to "END" )
src/rai_core/rai/agents/langchain/callback.py (3)
55-65
: Simplify buffer logic for better readability.The current nested conditional structure makes the logic harder to follow. Consider restructuring for clarity.
if self.aggregate_chunks: with self._buffer_lock: self.chunks_buffer += token - if len(self.chunks_buffer) < self.max_buffer_size: - if self._should_split(token): - self._send_all_targets(self.chunks_buffer) - self.chunks_buffer = "" - else: + should_send = len(self.chunks_buffer) >= self.max_buffer_size or self._should_split(token) + if should_send: self._send_all_targets(self.chunks_buffer) self.chunks_buffer = ""
28-42
: Consider adding customization options for message creation.The handler creates AIMessages with fixed parameters. Consider adding options to customize the message author or add metadata.
Adding a parameter for the message author would make the handler more flexible:
def __init__( self, connectors: dict[str, HRIConnector[HRIMessage]], aggregate_chunks: bool = False, splitting_chars: Optional[List[str]] = None, max_buffer_size: int = 200, + message_author: str = "AI", ): self.connectors = connectors self.aggregate_chunks = aggregate_chunks self.splitting_chars = splitting_chars or ["\n", ".", "!", "?"] self.chunks_buffer = "" self.max_buffer_size = max_buffer_size + self.message_author = message_author self._buffer_lock = threading.Lock()Then update the message creation in _send_all_targets:
def _send_all_targets(self, token: str): for connector_name, connector in self.connectors.items(): - connector.send_all_targets(AIMessage(content=token)) + connector.send_all_targets(AIMessage(content=token, name=self.message_author)) logger.debug(f"Sent token to {connector_name}")
31-31
: Type hint compatibility with different Python versions.The type annotation
dict[str, HRIConnector[HRIMessage]]
uses Python 3.9+ syntax. For better compatibility with earlier Python versions, consider usingDict
from typing.-from typing import List, Optional +from typing import Dict, List, Optional # Then use: - connectors: dict[str, HRIConnector[HRIMessage]], + connectors: Dict[str, HRIConnector[HRIMessage]],
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
examples/ros2_simple_agent.py
(1 hunks)src/rai_core/rai/agents/langchain/__init__.py
(1 hunks)src/rai_core/rai/agents/langchain/callback.py
(1 hunks)src/rai_core/rai/agents/langchain/simple_agent.py
(1 hunks)src/rai_core/rai/agents/simple_agent.py
(1 hunks)src/rai_core/rai/agents/tool_runner.py
(1 hunks)src/rai_core/rai/communication/hri_connector.py
(1 hunks)src/rai_core/rai/communication/ros2/api.py
(3 hunks)src/rai_core/rai/communication/ros2/connectors.py
(1 hunks)
🔇 Additional comments (10)
src/rai_core/rai/communication/hri_connector.py (1)
66-67
: Great addition of the__bool__
method.This method enhances the usability of
HRIMessage
by enabling boolean evaluation in conditional statements. The implementation correctly returnsTrue
if any content attribute is non-empty, following Python's truth value testing conventions.src/rai_core/rai/agents/langchain/__init__.py (1)
15-18
: Module structure looks good.The file correctly imports and exports the necessary components for the langchain module.
src/rai_core/rai/communication/ros2/connectors.py (1)
337-337
: Good improvement with default parameter value.Adding a default value of "human" for the
message_author
parameter enhances the method's usability by making it more convenient to call.src/rai_core/rai/agents/tool_runner.py (1)
42-45
: Good enhancement to logger parameter initialization.Making the
logger
parameter optional with a sensible default fallback improves the API flexibility. This change allows for easier instantiation ofToolRunner
while ensuring a valid logger is always available.src/rai_core/rai/communication/ros2/api.py (2)
387-390
: Good message queuing mechanism implementation.The implementation of a queue-based message handling system using
defaultdict(Queue)
is a solid approach for asynchronous message processing. The_generic_callback
method properly separates message receipt from processing logic.
433-433
:✅ Verification successful
Verify queue consumption workflow.
The changed callback mechanism now queues messages instead of processing them directly. Make sure there's a process in place elsewhere to dequeue and process these messages.
🏁 Script executed:
#!/bin/bash # Search for code that consumes messages from the queues rg -A 3 "message_data\[.+\]\.get" --type pyLength of output: 45
Action Required: Verify Queue Consumption Workflow
The automated search using the provided pattern did not yield any matching code for dequeuing or processing queued messages. This suggests that either a different mechanism is being used or that the code responsible for consuming queued messages may be located elsewhere or named differently. Please verify manually that there is a process elsewhere in the codebase that dequeues and processes these messages (for example, a usage of
queue.get()
, a dedicated consumer method, or similar logic).
- Confirm if there is any function or routine (like
process_queued_messages
) that explicitly consumes messages from the queue.- If the consumption mechanism exists under a different naming or structure, update the test/search pattern accordingly.
- If not, consider adding or documenting a consumer process to avoid potential message backlog issues.
To further assist, I've generated an additional shell script to search for any occurrence of
queue.get(
in the repository:Please run this script and review the output to ensure that a consumer process is indeed implemented.
🏁 Script executed:
#!/bin/bash # Search for code that dequeues messages using the queue.get() method in Python files rg -A 3 "queue\.get(" --type pyLength of output: 228
Action Required: Confirm Message Dequeueing Process
Our initial search for a consumer process using the
message_data[…].get
pattern and thequeue.get(
method did not return clear results. It appears the regex used initially had an issue, so please run the following corrected script to search for any consumer logic that dequeues queued messages:Once this script is executed, manually verify whether there is a dedicated function (e.g., a
process_queued_messages
routine or similar) handling the consumption of queued messages. If such a process exists under a different naming or structure, please update the documentation accordingly. If no consumer is found, consider implementing (or documenting the plan for) a mechanism to dequeue and process the queued messages to avoid potential message backlog issues.
🏁 Script executed:
#!/bin/bash # Search for code that calls the queue.get() method in Python files rg -A 3 "queue\\.get\\(" --type pyLength of output: 603
Queue Consumption Workflow Verified
The updated callback now enqueues messages as intended. We confirmed that the consumer process exists—the HMI components in the codebase (notably in
src/rai_hmi/rai_hmi/voice_hmi.py
andsrc/rai_hmi/rai_hmi/text_hmi.py
) actively dequeue messages usingmission_queue.get()
.Please double-check that these consumers are appropriately integrated with the new workflow and can handle the production load. Any necessary adjustments or additional documentation regarding the consumption process should be made accordingly.
src/rai_core/rai/agents/simple_agent.py (1)
27-39
: Good implementation of SimpleAgent class.The initialization method correctly sets up the necessary components including connectors, language model, callback handler, and state management with sensible defaults.
src/rai_core/rai/agents/langchain/simple_agent.py (2)
30-39
: Well-structured state definition with clear annotations.The
SimpleAgentState
TypedDict is well-defined with appropriate type annotations and documentation.
42-64
: Well-implemented LLM processing function.The
llm_node
function provides a clear interface for processing messages with proper documentation and error handling.src/rai_core/rai/agents/langchain/callback.py (1)
73-77
: LGTM: Proper cleanup on LLM completion.Good practice to ensure any remaining content in the buffer is sent when the LLM process ends.
Purpose
Rewrite: simplest agent built on top of the BaseAgent and HRIConnector api.
Proposed Changes
SimpleAgent with input and output gathered/sent via new connector api.
Issues
Testing
Summary by CodeRabbit
New Features
Refactor