一直以来都是在用kafka-python这个库连接kafka,但现在公司对kafka做了安全升级,加入了sasl认证,sasl.mechanisms用的是SCRAM-SHA-256,kafka-python并不支持,谷歌了一下,可以换成confluent_kafka。
pip install confluent_kafka
生产端示例代码
import json
from datetime import datetime
from confluent_kafka import Producer
topic_name = 'python_test'
conf = {
'bootstrap.servers': '10.110.18.214:8911,10.110.16.96:8911,10.110.19.242:8911',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': '432425',
'sasl.password': '534456'
}
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
producer = Producer(**conf)
data = {
'name': 'sheng',
'time': str(datetime.now())
}
for i in range(10):
producer.produce(topic_name, (json.dumps(data)).encode() , callback=delivery_report)
producer.flush()
消费端代码
from confluent_kafka import Consumer
topic_name = 'python_test'
KAFKA_BROKER_SERVERS = "10.110.18.214:8911,10.110.16.96:8911,10.110.19.242:8911"
consumer = Consumer({
'bootstrap.servers': KAFKA_BROKER_SERVERS,
'group.id': 'test_sasl',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': '432425',
'sasl.password': '534456'
})
consumer.subscribe([topic_name])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()
QQ交流群: 211426309