Kafka生产者
案例场景,信用卡事务处理系统:
- 有一个客户端应用程序,可能是一个在线商店(淘宝)
- 每当有支付行为发生时,负责将事务发送到Kafka上
- 另一个应用程序(支付宝)根据规则引擎检查这个事务,决定响应批准还是拒绝,并将响应消息写回Kafka,然后发送给发起事务的在线商店
- 第三个应用程序从Kafka上读取事务和审核状态,把消息保存到数据库,随后分析师对这些结果进行分析,期望借此改进规则引擎
学习任务:
- 如何创建KafkaProducer和ProducerRecords对象
- 如何将记录发送给Kafka
- 如何处理从Kafka返回的错误
- 介绍用于控制生产者行为的重要配置选项
- 如何使用不同的分区方法和序列化器,如何自定义分区器和序列化器
第三方客户端
除了内置的客户端外,Kafka还提供了二进制连接协议。二进制连接协议的意思是,我们直接向Kafka网络端口发送适当的字节序列,就可以实现从Kafka从读取或写入消息。根据二进制连接协议,其它各种语言都可以实现客户端,使得Kafka不仅仅局限在Java里面使用。
生产者概述
应用程序向Kafka中写入消息的应用场景:
- 记录用户的活动(用于审计和分析)
- 记录度量指标
- 保存日志消息
- 与其它应用程序进行异步通信
- 缓冲即将写入到数据库的数据
多样的使用场景意味着多样的需求:
- 是否每个消息都很重要?是否允许一小部分消息的丢失
- 偶尔出现重复消息是否可以接受?
- 是否有严格的延迟和吞吐量的要求?
信用卡事务处理系统是一种应用场景:消息丢失和消息重复是不允许的,可以接受的延迟为500ms,对吞吐量要求较高(100w/s)
保存网站点击信息是另一种应用场景:允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用体验就行。

流程:
先创建一个 ProducerRecord 对象,该对象包含目标主题topic和要发送的内容value,还可以指定键和分区
思考:如何理解可以指定键和分区?
在默认情况下,默认的分区器(分区策略)是对键的hash值取模。这不就意味着默认情况下一定会有键key?那是不是意味着键在不指定的情况下会自动生成?那生成策略又是什么呢?
答:对键的hash值取模是默认的分区器的分区策略,但是这在对键不为null的情况下才成立。当键为null时,会随机分配。Kafka使用自己的hash散列算法,优点是Java版本升级不会对其造成影响。降低耦合度
在发送 ProducerRecord 对象时,需要通过序列化器将key和value进行序列化,转为 byte[] 后再在网络上传输。
主题和分区信息用于告诉生产者(Kafka客户端)本地的发送线程发送到哪个目的地,没有必要在网络上传输,也就没有必要序列化
接着 byte[] 传给分区器,分区器根据 key 来计算一个分区。注意,在指定了分区的情况下,分区器不会进行任何操作,即指定的分区优先。
确定好主题topic和分区partition之后,ProducerRecord 对象会被添加到一个批次里(批次本质上是列表),再通过一个独立的线程将一批数据发送到相应的服务器broker中
服务器broker收到消息后,返回一个响应。
- 如果消息写入成功,就返回一个RecordMetaData对象,其中包含主题和分区信息,以及记录在分区里的偏移量
- 如果消息写入失败,就会返回一个错误。生产者在收到错误信息后会通过重试机制尝试一定次数,如果还是失败则最终返回错误信息。
服务器收到的是批次,也就是多条记录(消息),那这里是针对一条消息就返回一个响应吗?例如1个批次中存在10条记录,那么会返回10条响应?如果是的话,应该也是也批次的形式返回的吧!是否可能在同一批次中,出现1,3记录写入成功,但是2记录写入失败的情况?这种情况应该可能出现吧。
Kafka生产者的创建和配置项
必要字段 | 可选字段 | 作用 |
---|---|---|
bootstrap.servers |
指定broker的地址,格式为host:port 不需要包含Kafka集群中所有的broker地址,Kafka集群内部会维护,找到一台broker就相当于找打全部的broker 出于故障容错的考虑,建议至少提供两个broker的信息 |
|
key.serializer |
将消息的key对象序列化成byte[],Kafka默认提供了ByteArraySerializer、StringSerializer、IntegerSerializer 如果使用了自定义类型,那么就需要实现一个自定义的序列化器 |
|
value.serializer |
对消息的value对象进行序列化 | |
acks |
||
buffer.memory |
||
compression.type |
||
retries |
||
batch.size |
||
linger.ms |
||
client.id |
||
max.in.flight.requests.per.connection |
||
timeout.ms |
||
request.timeout.ms |
||
metadata.fetch.timeout.ms |
||
max.blocks.ms |
||
max.request.size |
||
receive.buffer.bytes |
||
send.buffer.bytes |
提高吞吐量:
- 增加单个生产者内的线程数量(多线程)
- 增加生产者数量(多生产者)
发送消息
消息的发送方式:
发送并忘记fire-and-forget
如果不关心发送结果,可以使用该方式。例如,记录不太重要的应用程序日志
同步发送
异步发送
生产者(客户端)可能发生的异常:
- 序列化消息失败SerializationException
- 缓冲区已满BufferExhaustedException或TimeoutException
- 发送线程被中断InterruptException
同步发送消息
TODO
异步发送消息
TODO
自定义序列化器
如果发送到Kafka的对象不是简单的字符串或整数,那么可以使用通用的序列化框架来创建记录(ProducerRecord),例如使用Avro、Thrift、Protobuf等。为了更好的理解这些序列化框架,需要学习如何自定义一个序列化器。
序列化器的本质上还是如何将一个对象变为字节数组,并且不仅要将数据保存下来,还需要将数据的长度保存下来。格式约定死的,比如putInt()则对应getInt(),这是由程序员手工对应上的。
分区
默认分区器的缺点:同一个key总是被映射到同一个分区partition上,所以在进行映射时,会使用topic包括不可用分区在内的所有分区。这意味着,当key映射到不可用分区时,就会发生错误。
使用类似默认分区器这种通过键hash取模来映射分区的策略时,需要在创建主题的时候就把分区规划好,并且永远不要增加新分区(缺点造成的问题的解决方案)。
分区写入策略
轮询分区策略
当消息的 key 为 null 时,会使用轮询分区策略,均匀分配
随机分区策略
按 key 分区策略
当 消息的 key 不为 null 时,此时作为默认分区策略
自定义分区策略
乱序问题
一个分区内部的数据是有序的,但是主题内部的数据是无序的。
以一个字符串为例,可以认为每个分区就对应了一个子序列。因此一个字符串就被拆分成多个不相交的子序列,子序列是有序的,但是序列间是无序的。
Kafka 解决乱序问题只能将数据导入一个分区中,但是这就失去了 Kafka 分布式的意义。
消息不丢失机制
生产者数据不丢失
ACK机制:
- 当ACK设置为-1时,分区的所有副本都写入成功后才响应生产者。能保证消息一定不丢失,换句话说,如果有任何一个分区副本写入失败,那么生产者就会发现写入失败,进而可以通过重试机制来重新发送消息。效率低,适用于对消息安全性要求严格的场景
- 当ACK设置为1时,分区的leader写入成功后就响应生产者。如果在leader写入成功后,但是在follower拉取之前,leader所在的broker宕机,那么就会产生消息丢失的问题。其它follower没有最新的数据副本,但是生产者得到写入成功的响应。
- 当ACK设置为0时,生产者不关心消息是否写入成功。效率高,适用于那些不重要数据的保存,例如用户点击操作日志,丢失一部分没有影响。
重试机制:
同步发送时,如果没有得到leader的响应,那么生产者就不会发送下一条消息
异步发送时,回调函数能够得到一个异常,如果出现异常,那么重新发送一次。
消费者数据不丢失
假设消费者在t=0时刻从Kafka中拉取数据,并开始业务的处理逻辑,但是该处理流程耗时较长,还未处理完成时触发Kafka的offset自动提交机制,假设在t=5时刻提交offset(即offset向后偏移,消息被消费),但是该consumer在提交完成后恰好宕机,在该consumer重连之后试图重新开始该业务逻辑,但是此时数据丢失。
消息传递:
At-most once:至多一次
At-least once:至少一次
Exactly-Once:仅有一次
消息丢失:offset提交成功,但是consumer的业务处理流程失败
重复消费:consumer的业务处理流程成功,但是offset提交失败
仅有一次:将consumer的业务处理流程和offset提交放入到一个事务中,要么都成功,要么都失败。疑问:如果offset提交到MySQL中,但是业务处理流程和MySQL无关,那么又如何实现呢?如果业务处理流程的系统不支持事务机制,该如何实现呢?
数据积压
消费者的消费能力跟不上生产者的生产能力,导致offset < end