import json from dataclasses import dataclass from typing import Optional, List, Dict, Any from rich.console import Console from qwen_agent.agents import Assistant @dataclass class Agent: model: str server: str api_key: str max_tokens: int = 30000 enable_thinking: bool = True tools: Optional[List[Dict[str, Any]]] = None console: Console = Console() def __post_init__(self): if self.tools is None: self.tools = [ {'mcpServers': { 'time': { 'command': 'uvx', 'args': ['mcp-server-time', '--local-timezone=Europe/London'] }, "fetch": { "command": "uvx", "args": ["mcp-server-fetch"] }, "ddg-search": { "command": "npx", "args": ["-y", "duckduckgo-mcp-server"] }, }}, 'code_interpreter', ] def run(self, prompt: str) -> None: """Run the agent with the given prompt""" llm_cfg = { 'model': self.model, 'model_server': self.server, 'api_key': self.api_key, } # Define Agent bot = Assistant(llm=llm_cfg, function_list=self.tools) # Streaming generation messages = [{'role': 'user', 'content': prompt}] final_responses = None try: with self.console.status("[bold blue]Thinking...", spinner="dots") as status: for responses in bot.run(messages=messages, enable_thinking=self.enable_thinking, max_tokens=self.max_tokens): final_responses = responses.pop() except Exception as e: self.console.print(f"[bold red]An error occurred during agent execution:[/] {e}") # Pretty-print the final response object if final_responses: self.console.print("\n[bold green]--- Full Response Object ---[/]") self.console.print(json.dumps(final_responses, indent=2)) self.console.print("\n[bold green]--- Extracted Content ---[/]") self.console.print(final_responses.get('content', 'No content found in response.')) else: self.console.print("[bold red]No final response received from the agent.[/]")