Python实战Kafka生产者API

在《Kafka快速开始》中简单描述了Kafka如何发送和消费消息,使用的是内置的命令行操作,在现实中,肯定要使用语言级别的API操作,Kafka原生的API是Java语言写的,我对Java不太熟悉,所以找了个Python的API,这就是kafka-python,和Java API很类似,安装非常简单 pip3 install kafka-python

今天简单说下生产者API,Kafka和http这样的传输协议很不一样,客户端会做很多的操作,所以在理解的时候必须改变这个观念,尤其消费者API使用更让人抓狂。

首先理解下客户端API的运行机制,包含两个步骤,看上去很简单,但内部工作并不那么容易理解。KafkaProducer()调用会产生一个ProducerRecord对象,它是一个pool缓存空间,里面包含了很多还没有发给broker的消息记录,同时还有I/O线程对象,用于将记录转化为请求,并发送给broker集群。

那么pool里面放的消息记录哪儿来呢?send()会发送具体的消息,其中发送者API内部有个分区器,用于将发送到同一个分区的消息放入到一个批次中(减少发送频率)。

send()是个异步方法,一旦调用后就立刻返回,调用100次send,实际的网络传输可能只有10次(比如一个批次中的消息只用发送一次),这样客户端吞吐量就变大了,但没法得到具体的 响应结果 ,可能就会丢一些消息,在Java生产者客户端中,send()支持回调,当失败的时候最终会告诉调用者(如果是部分消息发送失败怎么告诉调用者?),而Python生产者客户端没有看到回调方法。

send()方法后面如果紧跟get()方法,那么就是一个同步操作,这个对 单条消息 发送尤其重要,能够知道消息是否发送成功。如果broker响应成功,则会返回一个RecodMetaDate对象,包含了主题和分区信息,还有偏移量信息,如果失败则会返回一个错误。

生产者API认为Kafka是高可用的服务,虽然broker响应结果很重要,但有的时候调用者不一定要关注,相当于“你交给我任务,就放心吧”,生产者API遇到一些临时性的错误会进行重试(比如说leader分区暂时不可用),另外一些错误则是会在调用send()的时候立刻返回(比如消息太大),潜意识告诉我们只要send()没有立刻返回错误,理论上最终会发送成功。

上面说了一些基础理论,其实生产者API有很多参数,接下去我们逐一讲解,加强理解。

首先执行下面的命令,用于配置一个集群,一共有三个broker,pytest主题有三个分区,副本也是三个:

$ C:\fbs\kafka2.3\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic pytest

1:查看分区信息

producer = KafkaProducer(bootstrap_servers='localhost:9092,localhost:9093')
partitions_meta=producer.partitions_for('pytest')
print(partitions_meta)

需要留意的是,即使集群中有三个broker,bootstrap_servers也可以仅仅指定一个broker地址,它内部会找出其他broker的信息,不过不建议仅配置一个broker,因为这个节点可能会遇到网络问题。

2:错误返回

from kafka import KafkaProducer
from kafka.errors import KafkaError
try :
    producer = KafkaProducer(bootstrap_servers='localhost:9091',retries=4) 
    future = producer.send('pytest', b'another_message3')
except   KafkaError  as  e:
    print (e)

由于上述9091 broker并不存在,所以立刻会返回一个错误。

3:压缩和重试:

producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=4,compression_type='gzip')

压缩对于Kafka性能和容量至关重要,而retries表示遇到临时性错误会重试的次数,我们在写http调用代码的时候,一般要自己写代码进行重试,而Kafka生产者API帮你做了这些事情。

4:选择分区

future = producer.send('pytest', b'another_message3',partition=1)
future = producer.send('pytest', b'another_message3',key=b'k')

如果partition参数不为空,那么这条消息就会发送到指定的主题分区中去,partition默认是空的(它使用partitioner配置参数,该参数是一个回调函数,用于选择一个Hash算法,默认是murmur2算法),如果指定了key参数,那么一个相同的key其对应的消息会分配到相同的分区中(算法取决于partitioner),如果没有指定key,那么一条消息会随机发送到一个分区中(负载均衡)。

5:异步发送

try :
    producer = KafkaProducer(bootstrap_servers='localhost:9092') 
    for _ in range(100):
        future = producer.send('pytest', b'another_message3')
except   KafkaError  as  e:
    print (e)

在一个生产者pool中,一次发送100条消息,面临最终的问题就是python生产者API没有回调方法,不知道具体的返回信息。

6:同步发送

try :
    producer = KafkaProducer(bootstrap_servers='localhost:9092',acks="all") 
    future = producer.send('pytest', b'another_message3')
    result = future.get(timeout=60)
    print (result,result.partition,result.topic_partition[0]) 
except   KafkaError  as  e:
    print (e)

如果调用get()方法,那么这就是一个阻塞操作,如果发送一个很重要短消息,可以使用阻塞操作,能够告诉你消息的最终发送结果。

在同步发送中,acks参数非常重要,Kafka是一个多副本的存储,acks默认是1,表示leader分区成功写入就可以了,如果是all,表示其他分区同步完成,才代表这个操作结束(消息存储到所有副本中了)。

7:序列化器

Kafka使用字节序列传输,所以生产者API必须指定序列化器,key_serializer额value_serializer用于将key和value对象转换为bytes类型,默认使用字符串序列化器,如果不满足需求,需要自己创建。

try :
    producer = KafkaProducer(bootstrap_servers='localhost:9092',key_serializer=str.encode,value_serializer=lambda v: json.dumps(v).encode('utf-8')) 
    producer.send('pytest', key='k2', value={'foo': 'bar'})
except   KafkaError  as  e:
    print (e)

8:flush

生产者给每个分区未发送的消息维护了一个buffers,batch_size参数配置了这个buffers的大小(字节),该值越大,每一分区包含的带发送消息就越多,这样能够减少网络开销。

需要说明的是,buffers即使没满,消息也可能立刻发送出去(I/O线程只要可用),如果你希望降低发送频率,可以配置linger_ms参数(默认为0),如果linger_ms大于0(单位毫秒),则生产者在发送之前会等待linger_ms毫秒,这样会减少网络开销。

flush()方法会立刻发送某个分区缓存区中的消息(即使linger_ms配置为大于0),并阻塞相关联的请求,但不影响其他I/O线程处理。

try :
    producer = KafkaProducer(bootstrap_servers='localhost:9092',batch_size=10000,linger_ms=100)
    for _ in range(100):
        future = producer.send('pytest', b'another_message3') 
    producer.flush()
except   KafkaError  as  e:
    print (e)

下一篇会说消费者API,比生产者API复杂多了,也能让我们更好的理解Kafka内部机制。

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章