Python怎么用Kafka搭个完整例子,边学边用实操分享
- 问答
- 2025-12-31 14:22:35
- 3
综合参考了知乎专栏“Kafka实战:从零开始搭建你的第一个Python应用”、CSDN博客“Python操作Kafka的保姆级教程”以及B站视频“手把手教你用Python玩转Kafka”的核心演示步骤)
好,咱们直接开干,想用Python和Kafka搭个能跑起来的例子,最实在的就是自己动手敲一遍,别管那些复杂的理论,先让消息发出去、收回来,有了感觉再深入,下面我就带你一步一步实现一个最简单的“生产者-消费者”模型,也就是一个程序负责发送消息,另一个程序负责接收消息。
第一步:准备工作——安装必要的库
你得确保电脑上已经有Python了,我们需要安装一个Python库,它就像是Python和Kafka之间的翻译官,这个库叫kafka-python,在命令行里用pip安装就行(来源:CSDN博客)。
打开你的终端(Windows叫命令提示符或PowerShell,Mac叫终端),输入下面这行命令然后回车:
pip install kafka-python
如果安装速度慢,可以试试加上清华的镜像源:pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple。
第二步:启动Kafka服务器(Broker)
光有Python库还不够,我们需要一个Kafka服务器来接收和存储消息,最方便的方法是使用Kafka官方自带的脚本快速启动一个单节点的Kafka(来源:B站视频演示流程)。
- 下载Kafka: 去Apache Kafka官网(kafka.apache.org)下载最新版本的二进制文件,比如
kafka_2.13-3.6.1.tgz(版本号可能会变),解压到一个你找得到的位置。 - 启动ZooKeeper: Kafka依赖ZooKeeper来管理元数据,打开一个终端窗口,进入你解压Kafka的目录,然后运行:
bin/zookeeper-server-start.sh config/zookeeper.properties
(Windows系统是:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties) 这个窗口会一直运行,别关它。
- 启动Kafka Server: 再打开一个新的终端窗口,同样进入Kafka目录,运行:
bin/kafka-server-start.sh config/server.properties
(Windows系统是:
bin\windows\kafka-server-start.bat config\server.properties) 这个窗口也会持续运行,你的简易Kafka环境就准备好了。
第三步:编写Python生产者代码
现在我们来写发送消息的Python程序,创建一个新文件,比如叫producer.py。
# 导入Kafka生产者类
from kafka import KafkaProducer
# 1. 创建一个生产者实例
# bootstrap_servers:告诉生产者Kafka服务器在哪里,本地就是'localhost:9092'
# value_serializer:把我们要发送的字符串消息转换成Kafka需要的字节格式
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: x.encode('utf-8')
)
# 2. 准备要发送的消息主题(Topic),主题就像是消息的类别或者频道,消费者会订阅它。
topic_name = 'my_first_topic'
# 3. 发送几条测试消息
for i in range(5):
message = f"Hello Kafka! 这是第 {i+1} 条消息"
# 使用send方法发送消息,指定主题和消息内容
producer.send(topic_name, value=message)
print(f"已发送: {message}")
# 4. 确保所有消息都发送出去(清空缓冲区)
producer.flush()
print("所有消息发送完毕!")
# 5. 关闭生产者连接
producer.close()
第四步:编写Python消费者代码
再创建一个新文件,叫consumer.py,用来持续监听并接收消息。

# 导入Kafka消费者类
from kafka import KafkaConsumer
# 1. 创建一个消费者实例
# bootstrap_servers:同样指定Kafka服务器地址
# auto_offset_reset:当没有初始偏移量时,从最早的消息开始读取('earliest')
# value_deserializer:把接收到的字节数据解码成字符串
# group_id:消费者组ID,同一个组内的消费者共同消费一个主题,这里我们先简单设一个名字
consumer = KafkaConsumer(
'my_first_topic', # 直接在这里指定要订阅的主题
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: x.decode('utf-8'),
group_id='my_test_group'
)
print("开始监听消息... (按 Ctrl+C 停止)")
# 2. 消费者是一个持续不断的迭代器,会一直等待新消息
try:
for message in consumer:
# 打印接收到的消息详情
print(f"""
收到消息!
主题: {message.topic}
分区: {message.partition}
偏移量: {message.offset}
键: {message.key}
值: {message.value}
""")
except KeyboardInterrupt:
print("停止监听。")
# 3. 关闭消费者
finally:
consumer.close()
第五步:运行和测试
- 确保ZooKeeper和Kafka Server那两个终端窗口还在运行。
- 先运行消费者程序,让它进入等待状态,打开一个新的终端,输入:
python consumer.py
你会看到输出“开始监听消息...”。
- 再打开一个新的终端,运行生产者程序:
python producer.py
生产者终端会快速打印出5条“已发送”的信息。
- 立刻切换回运行消费者的终端窗口,你会看到它已经收到了那5条消息,并打印出了每条消息的详细信息。
恭喜你!你已经成功搭建了一个最基础的Kafka消息流,生产者像是一个说话的人,消费者像是一个听话的人,Kafka服务器就是他们之间的传声筒和记事本,确保消息不会丢失。
边学边用的下一步:
- 玩一下主题管理: 在Kafka目录下,你可以用命令
bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看当前有哪些主题。 - 试试多个消费者: 同时开两个
consumer.py的终端,再运行一次producer.py,观察消息是怎么被分配的(默认是负载均衡)。 - 修改消息内容: 在生产者代码里发送一些更复杂的数据,比如字典,你可能需要用到
json_serializer。
这个过程虽然简单,但包含了Kafka最核心的生产-消费模型,通过这个实操,你再去看那些“分区”、“副本”、“消费者组”等概念,就会觉得具体多了。
本文由太叔访天于2025-12-31发表在笙亿网络策划,如有疑问,请联系我们。
本文链接:http://waw.haoid.cn/wenda/71923.html
