03-Kafka生产者

Kafka生产者

案例场景,信用卡事务处理系统:

  1. 有一个客户端应用程序,可能是一个在线商店(淘宝)
  2. 每当有支付行为发生时,负责将事务发送到Kafka上
  3. 另一个应用程序(支付宝)根据规则引擎检查这个事务,决定响应批准还是拒绝,并将响应消息写回Kafka,然后发送给发起事务的在线商店
  4. 第三个应用程序从Kafka上读取事务和审核状态,把消息保存到数据库,随后分析师对这些结果进行分析,期望借此改进规则引擎

学习任务:

  • 如何创建KafkaProducer和ProducerRecords对象
  • 如何将记录发送给Kafka
  • 如何处理从Kafka返回的错误
  • 介绍用于控制生产者行为的重要配置选项
  • 如何使用不同的分区方法和序列化器,如何自定义分区器和序列化器

第三方客户端

除了内置的客户端外,Kafka还提供了二进制连接协议。二进制连接协议的意思是,我们直接向Kafka网络端口发送适当的字节序列,就可以实现从Kafka从读取或写入消息。根据二进制连接协议,其它各种语言都可以实现客户端,使得Kafka不仅仅局限在Java里面使用

生产者概述

应用程序向Kafka中写入消息的应用场景:

  • 记录用户的活动(用于审计和分析)
  • 记录度量指标
  • 保存日志消息
  • 与其它应用程序进行异步通信
  • 缓冲即将写入到数据库的数据

多样的使用场景意味着多样的需求:

  • 是否每个消息都很重要?是否允许一小部分消息的丢失
  • 偶尔出现重复消息是否可以接受?
  • 是否有严格的延迟和吞吐量的要求?

信用卡事务处理系统是一种应用场景:消息丢失和消息重复是不允许的,可以接受的延迟为500ms,对吞吐量要求较高(100w/s)

保存网站点击信息是另一种应用场景:允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用体验就行。

生产者组件图

流程:

  1. 先创建一个 ProducerRecord 对象,该对象包含目标主题topic和要发送的内容value,还可以指定键和分区

    思考:如何理解可以指定键和分区?

    在默认情况下,默认的分区器(分区策略)是对键的hash值取模。这不就意味着默认情况下一定会有键key?那是不是意味着键在不指定的情况下会自动生成?那生成策略又是什么呢?

    答:对键的hash值取模是默认的分区器的分区策略,但是这在对键不为null的情况下才成立。当键为null时,会随机分配。Kafka使用自己的hash散列算法,优点是Java版本升级不会对其造成影响。降低耦合度

  2. 在发送 ProducerRecord 对象时,需要通过序列化器将key和value进行序列化,转为 byte[] 后再在网络上传输。

    主题和分区信息用于告诉生产者(Kafka客户端)本地的发送线程发送到哪个目的地,没有必要在网络上传输,也就没有必要序列化

  3. 接着 byte[] 传给分区器,分区器根据 key 来计算一个分区。注意,在指定了分区的情况下,分区器不会进行任何操作,即指定的分区优先。

  4. 确定好主题topic和分区partition之后,ProducerRecord 对象会被添加到一个批次里(批次本质上是列表),再通过一个独立的线程将一批数据发送到相应的服务器broker中

  5. 服务器broker收到消息后,返回一个响应。

    • 如果消息写入成功,就返回一个RecordMetaData对象,其中包含主题和分区信息,以及记录在分区里的偏移量
    • 如果消息写入失败,就会返回一个错误。生产者在收到错误信息后会通过重试机制尝试一定次数,如果还是失败则最终返回错误信息。

    服务器收到的是批次,也就是多条记录(消息),那这里是针对一条消息就返回一个响应吗?例如1个批次中存在10条记录,那么会返回10条响应?如果是的话,应该也是也批次的形式返回的吧!是否可能在同一批次中,出现1,3记录写入成功,但是2记录写入失败的情况?这种情况应该可能出现吧。

Kafka生产者的创建和配置项

表:生产者常用配置项
必要字段 可选字段 作用
bootstrap.servers 指定broker的地址,格式为host:port
不需要包含Kafka集群中所有的broker地址,Kafka集群内部会维护,找到一台broker就相当于找打全部的broker
出于故障容错的考虑,建议至少提供两个broker的信息
key.serializer 将消息的key对象序列化成byte[],Kafka默认提供了ByteArraySerializer、StringSerializer、IntegerSerializer
如果使用了自定义类型,那么就需要实现一个自定义的序列化器
value.serializer 对消息的value对象进行序列化
acks
buffer.memory
compression.type
retries
batch.size
linger.ms
client.id
max.in.flight.requests.per.connection
timeout.ms
request.timeout.ms
metadata.fetch.timeout.ms
max.blocks.ms
max.request.size
receive.buffer.bytes
send.buffer.bytes

提高吞吐量:

  • 增加单个生产者内的线程数量(多线程)
  • 增加生产者数量(多生产者)

发送消息

消息的发送方式:

  • 发送并忘记fire-and-forget

    如果不关心发送结果,可以使用该方式。例如,记录不太重要的应用程序日志

  • 同步发送

  • 异步发送

生产者(客户端)可能发生的异常:

  • 序列化消息失败SerializationException
  • 缓冲区已满BufferExhaustedException或TimeoutException
  • 发送线程被中断InterruptException

同步发送消息

TODO

异步发送消息

TODO

自定义序列化器

如果发送到Kafka的对象不是简单的字符串或整数,那么可以使用通用的序列化框架来创建记录(ProducerRecord),例如使用Avro、Thrift、Protobuf等。为了更好的理解这些序列化框架,需要学习如何自定义一个序列化器。

序列化器的本质上还是如何将一个对象变为字节数组,并且不仅要将数据保存下来,还需要将数据的长度保存下来。格式约定死的,比如putInt()则对应getInt(),这是由程序员手工对应上的。

分区

默认分区器的缺点:同一个key总是被映射到同一个分区partition上,所以在进行映射时,会使用topic包括不可用分区在内的所有分区。这意味着,当key映射到不可用分区时,就会发生错误。

使用类似默认分区器这种通过键hash取模来映射分区的策略时,需要在创建主题的时候就把分区规划好,并且永远不要增加新分区(缺点造成的问题的解决方案)。

分区写入策略

  • 轮询分区策略

    当消息的 key 为 null 时,会使用轮询分区策略,均匀分配

  • 随机分区策略

  • 按 key 分区策略

    当 消息的 key 不为 null 时,此时作为默认分区策略

  • 自定义分区策略

乱序问题

一个分区内部的数据是有序的,但是主题内部的数据是无序的。

以一个字符串为例,可以认为每个分区就对应了一个子序列。因此一个字符串就被拆分成多个不相交的子序列,子序列是有序的,但是序列间是无序的。

Kafka 解决乱序问题只能将数据导入一个分区中,但是这就失去了 Kafka 分布式的意义。

消息不丢失机制

生产者数据不丢失

ACK机制:

  • 当ACK设置为-1时,分区的所有副本都写入成功后才响应生产者。能保证消息一定不丢失,换句话说,如果有任何一个分区副本写入失败,那么生产者就会发现写入失败,进而可以通过重试机制来重新发送消息。效率低,适用于对消息安全性要求严格的场景
  • 当ACK设置为1时,分区的leader写入成功后就响应生产者。如果在leader写入成功后,但是在follower拉取之前,leader所在的broker宕机,那么就会产生消息丢失的问题。其它follower没有最新的数据副本,但是生产者得到写入成功的响应。
  • 当ACK设置为0时,生产者不关心消息是否写入成功。效率高,适用于那些不重要数据的保存,例如用户点击操作日志,丢失一部分没有影响。

重试机制:

同步发送时,如果没有得到leader的响应,那么生产者就不会发送下一条消息

异步发送时,回调函数能够得到一个异常,如果出现异常,那么重新发送一次。

消费者数据不丢失

假设消费者在t=0时刻从Kafka中拉取数据,并开始业务的处理逻辑,但是该处理流程耗时较长,还未处理完成时触发Kafka的offset自动提交机制,假设在t=5时刻提交offset(即offset向后偏移,消息被消费),但是该consumer在提交完成后恰好宕机,在该consumer重连之后试图重新开始该业务逻辑,但是此时数据丢失。

消息传递:

At-most once:至多一次

At-least once:至少一次

Exactly-Once:仅有一次

消息丢失:offset提交成功,但是consumer的业务处理流程失败

重复消费:consumer的业务处理流程成功,但是offset提交失败

仅有一次:将consumer的业务处理流程和offset提交放入到一个事务中,要么都成功,要么都失败。疑问:如果offset提交到MySQL中,但是业务处理流程和MySQL无关,那么又如何实现呢?如果业务处理流程的系统不支持事务机制,该如何实现呢?

数据积压

消费者的消费能力跟不上生产者的生产能力,导致offset < end


   转载规则


《03-Kafka生产者》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
02-安装Kafka 02-安装Kafka
单机测试创建主题 # 主题名为test kafka-topics.sh \ --create \ --zookeeper hadoop001:2181 \ --replication-factory 1 \ --partitions 1 \
2023-05-27
下一篇 
01-认识Kafka 01-认识Kafka
认识Kafka每个应用程序都会产生数据:日志消息、度量指标、用户活动记录、响应消息等。数据为企业发展提供动力,从数据中获取信息,对它们进行分析处理,然后生成更多的数据。 我们把数据从源头移动到可以对它们进行分析处理的地方,然后把得到的结果应
2023-05-27
  目录