本项目借助 Python 和千帆平台,助力你快速构建位置服务代理机器人,为你提供便捷的位置相关服务。
按照以下步骤,你可以快速搭建并运行位置服务代理机器人。
要运行这个项目,您需要先安装以下依赖项:
pip install -r requirements.txt
创建一个 config.py 文件,并添加以下内容:
class MCPConfig:
def __init__(self):
self.app_id = "your_app_id" # 替换为您的千帆应用 ID
self.token = "your_token" # 替换为您的千帆令牌
您可以从 千帆官方文档 下载并安装 SDK。
在千帆平台上创建一个新的应用,并获取 app_id 和 token。
将 MCP Server 的环境变量设置为您的应用 ID 和令牌:
export APPBUILDER_TOKEN="your_token"
运行以下命令启动 MCP Server:
python main.py
将以下代码保存为 app.py,并按需修改路径和参数:
import os
import asyncio
import appbuilder
from appbuilder.core.console.appbuilder_client.async_event_handler import (
AsyncAppBuilderEventHandler,
)
from appbuilder.mcp_server.client import MCPClient
class MyEventHandler(AsyncAppBuilderEventHandler):
def __init__(self, mcp_client):
super().__init__()
self.mcp_client = mcp_client
def get_current_weather(self, location=None, unit="摄氏度"):
return "{} 的温度是 {} {}".format(location, 20, unit)
async def interrupt(self, run_context, run_response):
thought = run_context.current_thought
# 绿色打印
print("\033[1;31m", "-> Agent 中间思考: ", thought, "\033[0m")
tool_output = []
for tool_call in run_context.current_tool_calls:
tool_res = ""
if tool_call.function.name == "get_current_weather":
tool_res = self.get_current_weather(**tool_call.function.arguments)
else:
print(
"\033[1;32m",
"MCP工具名称: {}, MCP参数:{}\n".format(tool_call.function.name, tool_call.function.arguments),
"\033[0m",
)
mcp_server_result = await self.mcp_client.call_tool(
tool_call.function.name, tool_call.function.arguments
)
print("\033[1;33m", "MCP结果: {}\n\033[0m".format(mcp_server_result))
for i, content in enumerate(mcp_server_result.content):
if content.type == "text":
tool_res += mcp_server_result.content[i].text
tool_output.append(
{
"tool_call_id": tool_call.id,
"tool_response": tool_res
}
)
return tool_output
async def main():
# 初始化 MCP 客户端
mcp_client = MCPClient()
await mcp_client.start()
# 创建EventHandler并连接到MCP客户端
event_handler = MyEventHandler(mcp_client)
await mcp_client.register_event_handler(event_handler)
# 等待用户输入
while True:
query = input("请输入查询: ")
response = await mcp_client.process_query(query)
print("\033[1;34m", "Agent 响应:", response, "\033[0m")
if __name__ == "__main__":
asyncio.run(main())
运行 app.py,您将看到一个交互式界面,可以输入位置查询。
# 启动 MCP Server
python main.py
# 在另一个终端中启动 Agent
python app.py
输入如“北京天气如何?”或“附近有哪些酒店?”,机器人将处理并返回结果。
MCPConfig 类,添加更多位置相关服务。此项目遵循 MIT 协议。请在使用前仔细阅读许可条款。