这是一个基于 MCP(模型上下文协议)协议的 Kafka 工具,借助大模型实现与 Kafka 的连接。它具备多种实用功能,可查询指定主题信息、在指定主题中读写数据,还能回放指定时间和主题的数据。
pip install -r requirements.txt
直接运行 mcp_kafka_server.py 文件:
python mcp_kafka_server.py
之后在 MCP 客户端(如 Claude、Cherry Studio 等)中连接本地 MCP 服务器。
运行 mcp_kafka_sse_server.py 文件启动 Web 服务:
python mcp_kafka_sse_server.py
服务将启动在 http://localhost:8000,MCP 端点为 http://localhost:8000/mcp。然后在 MCP 客户端中连接这个远程 MCP 服务器。
list_topics()
此功能可返回 Kafka 中所有可用的主题列表。
get_topic_info(topic_name)
该功能用于获取指定主题的详细信息,涵盖分区数量、每个分区的起始和结束偏移量以及消息数量。
send_message(topic_name, message, key)
向指定主题发送消息,message 可以是 JSON 字符串或普通字符串,key 为可选参数。
read_messages(topic_name, max_messages, timeout_ms)
从指定主题读取最新的消息,max_messages 指定最大读取消息数(默认 10 条),timeout_ms 指定超时时间(默认 1000 毫秒)。
replay_messages(topic_name, start_time, end_time, max_messages)
回放指定时间段内的消息,start_time 和 end_time 为 ISO 格式的时间字符串(如 '2023-01-01T00:00:00'),max_messages 指定最大回放消息数(默认 100 条)。
默认连接到 localhost:9092,如需修改连接地址,请在 KafkaManager 类的初始化参数中修改 bootstrap_servers。