分布式消息中间件之Kafka
Kafka
Apache Kafka 是一个高吞吐量的分布式事件流处理平台(分布式发布/订阅消息系统),由Scala 和 Java 语言写成,主要功能如下:
- 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
- 根据需要持久可靠地存储事件流。
- 在事件发生时或回顾性地处理事件流。
所有这些功能都以分布式、高度可扩展、弹性、容错和安全的方式提供。 Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。
发展历史
Kafka最初是由领英(LinkedIn)开发,于2011年开源,并于2012年从Apache孵化出站。
2014年11月,几个曾在领英为Kafka工作的工程师,创建了名为Confluent 的新公司,着眼于Kafka,2021年6月,Confluent 在美国纳斯达克上市,市值超过100亿美元。
版本 | 时间 | 说明 |
---|---|---|
0.7.0.0 | 2012年 | |
0.8.0.0 | 2013年 | 增加副本机制 |
0.9.0.0 | 2015年 | 引入了新的组件 Kafka Connect;完善架构安全方面 |
0.10.0.0 | 2016年 | 新增Kafka Stream API |
0.11.0.0 | 2017年 | 重构了控制器的底层设计;支持幂等性和事务 |
1.0.0 | 2017年 | |
2.0.0 | 2018年 | |
2.8.0 | 2021年 | |
3.0.0 | 2021年 | |
3.2.0 | 2022年 | |
3.3.1 | 2022年10月 | 对 KRaft进行完善升级,将 KRaft 标记为生产就绪 |
4.0.0 | 2025-03 | 完全废除Zookper的依赖 |
使用目的
异步解耦:
同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易时,订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采用同步方式实现,将严重影响系统性能。针对此场景,我们可以利用消息中间件解耦订单创建操作和其他后续行为。
削峰填谷:
利用 Broker 缓冲上游生产者瞬时突发的流量,使消费者消费流量整体平滑。对于发送能力很强的上游系统,如果没有消息中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩。想象秒杀业务场景,上游业务发起下单请求,下游业务执行秒杀业务(库存检查,库存冻结,余额冻结,生成订单等等),下游业务处理的逻辑是相当复杂的,并发能力有限,如果上游服务不做限流策略,瞬时可能把下游服务压垮。针对此场景,我们可以利用 MQ 来做削峰填谷,让高峰流量填充低谷空闲资源,达到系统资源的合理利用。
应用场景
以下是 Apache Kafka 的几个主要应用场景:
- 数据存储
- 构建日志分析系统
- 流计算处理
- 网站活动跟踪
数据存储
近年来KV存储(HBase)、搜索(ElasticSearch)、流式处理(Storm/Spark Streaming/Samza)、时序数据库(OpenTSDB)等专用系统应运而生,产生了同一份数据集需要被注入到多个专用系统内的需求。利用大数据消息中间件 Kafka 作为数据中转枢纽,同份数据可以被导入到不同专用系统中。

构建日志分析系统
大型分布式系统每天都会产生大量的日志。Kafka 性能高效,采集日志时业务无感知以及Hadoop/ODPS 等离线仓库存储和 Storm/Spark 等实时在线分析对接的特性决定它非常适合作为”日志收集中心”。

流计算处理
股市走向分析、气象数据测控、网站用户行为分析等领域,由于数据产生快、实时性强、数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需求。而大数据消息中间件 Kafka 以及 Storm/Samza/Spark 等流计算引擎的出现,可以根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。
网站活动跟踪
通过Kafka 可以实时收集网站活动数据(包括用户浏览页面、搜索及其他行为等)。发布-订阅的模式可以根据不同的业务数据类型,将消息发布到不同的 Topic;还可通过订阅消息的实时投递,将消息流用于实时监控与业务分析或加载到 Hadoop、ODPS 等离线数据仓库系统进行离线处理。
高吞吐
网站所有用户产生的行为信息极为庞大,需要非常高的吞吐量来支持;
大数据分析
可对接 Storm/Spark 实时流计算引擎,也可以对接 Hadoop/ODPS 等离线数据仓库系统。
架构及组件
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker 、若干 Consumer ,以及ZooKeeper 集群。


核心组件:
名词 | 解释 |
---|---|
Broker | 节点服务器,一个Kafka节点就是一个Broker,一个Kafka集群由多个Broker组成; |
Topic | 主题,Kafka 对消息进行分类,发送到集群的每一条消息都要指定Topic; |
Partition | 分区,每个Topic包含一个或多个Partition,一个Partition对应一个文件夹,这个文件夹下存储Partition的数据和索引文件,每个Partition内部的数据都是有序的。 |
Segment | 数据段文件,每个分区由大小相同的数据段文件组成,每个数据文件由数据和索引两部分组成; |
Producer | 生产者,负责发布消息到Broker |
Consumer | 消费者,从Broker 读取消息; |
Consumer Group | 每个消费者属于一个特定的消费者组,可为每个consumer指定group name,若不指定,则属于默认的group,一条消息可以发送到不同的consumer group,但一个consumer group中只能有一个consumer能消费这条消息; |
Zookeeper | Kafka集群的协调者,保存节点元数据 |
Broker Controller | 特殊的节点服务器,一个Kafka集群中只有一个Broker Controller,额外负责集群的管理工作。 |
核心概念:
- 生产者分区策略
- 分区副本 Replica (一个主副本Leader、多个从副本 Follower)
- 偏移量 Consumer Offset
- 消费者重平衡 Rebalance
发布与订阅
kafka是发布与订阅模式,这个订阅者是消费组,而不是消费者实例。每一条消息只会被同一个消费组里的一个消费者实例消费,不同的消费组可以同时消费同一条消息,
Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统的消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。
Kafka 的性能在数据大小方面实际上是恒定的,不会随着数据量的增大而效率降低。
Topic 是分区的,这意味着主题分布在位于不同 Kafka 代理上的多个“桶”中。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上是附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一个分区,并且 Kafka 保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。

如上图:此示例主题有四个分区 P1–P4。 两个不同的生产者客户端通过网络将事件写入主题的分区,彼此独立地向主题发布新事件。 具有相同键的事件(在图中由它们的颜色表示)被写入同一个分区。 请注意,如果合适的话,两个生产者都可以写入同一个分区
功能特性
- 高吞吐、低延时:这是 Kafka 显著的特点,Kafka 能够达到百万级的消息吞吐量,延迟可达毫秒级;
- 持久化存储:Kafka 的消息最终持久化保存在磁盘之上,提供了顺序读写以保证性能,并且通过 Kafka 的副本机制提高了数据可靠性。
- 分布式可扩展:Kafka 的数据是分布式存储在不同 broker 节点的,以 topic 组织数据并且按 partition 进行分布式存储,整体的扩展性都非常好。
- 高容错性:集群中任意一个 broker 节点宕机,Kafka 仍能对外提供服务。
流处理
什么是流处理
流处理是一种大数据技术,它用于查询连续的数据流,并在接收到数据后的一小段时间内快速检测条件。检测时间从几毫秒到几分钟不等。例如,通过流处理,可以在温度达到冰点时接收警报,查询来自温度传感器的数据流。
它也有很多其他名称:实时分析、流分析、复杂事件处理、实时流分析和事件流处理。
为什么需要流处理
随着 IoT物联网数据监控系统和视频流等大数据系统的出现,要求系统对不间断产生的数据实时处理分析。这些数据被分为两类:有边界数据和无边界数据。
有边界数据,是一种保存好了的数据,例如数据库或者csv中的数据等。
无边界数据,其实是一种持续增长的数据集,我们无法判断它到底什么什么时候结束,例如,银行的交易数据和物联网设备数据,每时每刻都会有数据产生,并且无法判断它什么时候会停止发送。我们也称之为事件流或流数据。
对于大数据,通常有两种处理方式:批处理和流处理。
批处理一般都具有高延迟性,有可能需要计算几小时甚至几天,但分析数据更加深入和彻底。流处理的特点是低延迟,处理速度快,响应时间一般是毫秒或者秒级的,事件流处理是指对事件以近实时的方式计算与处理,所以对实时性要求很高的场景,应该考虑使用流处理的方式处理数据。在现实中两种方式都被广泛使用,而采用哪种处理模式,应当由使用场景决定。
应用场景
事件流应用于众多行业和组织的各种用例:
- 实时处理支付和金融交易,例如证券交易所、银行和保险。
- 实时跟踪和监控汽车、卡车、车队和货运,例如在物流和汽车行业。
- 持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电场。
- 收集并立即响应客户互动和订单,例如零售、酒店和旅游行业以及移动应用程序。
- 监测住院病人,预测病情变化,确保在紧急情况下及时治疗。
- 连接、存储和提供公司不同部门产生的数据。
- 作为数据平台、事件驱动架构和微服务的基础。
流处理引擎
- Apache Storm
- Apache Samza
- Apache Spark Streaming
- Apache Flink
目前 Spark 和 Flink 两大流处理框架的市场份额最大,在企业中被广泛应用(大数据)。如字节跳动团队采用Spark 和 Flink混合部署的云原生架构模式,即Flink 不使用资源,或负载低的时候,资源可以出让给 Spark,Spark 执行完批式计算后,空闲的资源也可以出让给流式计算(Flink)用,这样可以充分利用计算资源。
而Kafka Stream 目前在流计算领域的份额并不大,一是Kafka Stream 的诞生时间比较晚,之前已经有很多流计算引擎诞生并广泛应用,二是像Spark 和 Flink 这样的计算引擎,本身设计就比较合理、优秀,一般很难被代替和超越。
Kafka API
Kafka 为 Java 和 Scala 提供了五个核心 API:
- Producer API
- Consumer API
- Admin API
- Stream API
- Connector API

生产者 Producer
将事件流发布(写入)到一个或多个 Kafka 主题
生产者客户端架构:

发送消息
异步发送
public String getProducer() {
//创建生产者
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.2.0.11:9092,10.2.0.12:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
public String sendAsync() {
//异步发送消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic1", "key1", "value1");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("生产者发送消息完成!");
if (exception != null) {
// TODO 消息发送失败的处理
exception.printStackTrace();
}
}
});
}
同步发送
public String sendSync() {
//省略部分代码
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic1", "key1", "value1");
//同步发送消息
try {
RecordMetadata recordMetadata = producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
批量发送
为减少负载和客户端的请求数量,生产者不会一条一条发送,而是会逗留一段时间批量发送。batch.size
和 linger.ms
满足任何一个条件都会发送。
Producer在发送消息的时候,会将消息放到一个ProducerBatch中,这个Batch可能包含多条消息,然后再将Batch打包发送。下面的生产者消息缓存设计会详细讲解这部分内容。
分区策略
Kafka 对于数据的读写粒度是分区,生产者在发送消息的时候,需要指定发送到哪个分区,Kafka 提供以下几种分区策略:
- DefaultPartitioner 默认分区策略
- UniformStickyPartitioner 纯粹的粘性分区策略
- RoundRobinPartitioner 分区策略
注:Kafka 的分区策略均实现 Partitioner
接口;
默认分区策略
默认分区策略:
- 如果记录中指定了分区,则使用它。
- 如果未指定分区但存在键,则根据键的散列选择分区。
- 如果不存在键和分区,则会使用粘性分区策略。(下面会详细讲解什么是粘性分区策略)
生产者消息记录实体:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
/** 指定主题和消息(没有指定分区和key)*/
public ProducerRecord(String topic, V value);
/** 指定主题、key和消息(没有指定分区)*/
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value,
Iterable<Header> headers)
}
纯粹的粘性分区策略
分区策略:
- 如果记录中指定了分区,则使用它。
- 如果未指定分区,则会使用粘性分区策略。
注:UniformStickyPartitioner 与默认分区的唯一区别是,纯粹粘性分区策略会忽视设置的键,统一使用粘性分区策略分配。
RoundRobinPartitioner 策略
分区策略:
- 如果记录中指定了分区,则使用它。
- 如果未指定分区,将消息平均的分配到每个分区中。
自定义分区策略
通过配置 partitioner.class
参数可以指定自定义的分区策略,默认为 DefaultPartitioner。
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
开发者可以实现 Partitioner
接口实现自己的分区器。
服务器应答确认
acks
配置参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项:
如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,正因为生产者不需要等待服务器的响应,所以这种配置可以达到很高的吞吐量。
如果 acks=1,只要集群的Leader节点收到消息并响应给生产者,生产者就认为消息写入是成功的。如果Leader 节点崩溃,生产者会进行重试。
如果 acks=all,只有当所有参与复制的节点收到消息时,生产者才会收到来自服务器的成功响应。这种模式是最安全的,同时延迟也会比其他的更高。(注:参数值为 all
或 -1
是等价的)
失败重试机制
生产者发送消息时,如果发生了可重试的异常,生产者则会重试发送消息,避免消息丢失。
几个与重试有关的配置参数:
retries
重试次数 ,默认值为Integer.MAX_VALUE
,约21亿,即循环不断重试。retry.backoff.ms
重试间隔时间,默认值为100ms,适当调大可以防止重试过于频繁。
注:重试过程中不会重新计算消息的分区信息,所以重试可以保证幂等性不会被破坏。(一次send,只计算一次分区)
可重试异常
并不是所有的异常都可以重试,抛出的异常必须是可重试异常 RetriableException
,生产者才会重试。可重试异常是一个暂时异常,如果重试可能会成功。
查看 RetriableException
的子类可以了解具体哪些异常属于可重试异常。
记录顺序问题
如果设置 max.in.flight.requests.per.connection
参数大于1(默认为5,单个连接上发送的未确认请求的最大数量),可能会改变记录的顺序,因为如果将两个batch发送到同一个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch中的记录可能先被消费。
解决方案:
设置 max.in.flight.requests.per.connection
参数为1,即不允许单个连接上有未确认请求的情况下继续发送请求,这样batch2必须等待batch1重试结束后,才能继续发送请求。(不过这样会影响吞吐量,所以只有对消息的处理顺序有严格要求的时候才这么做)
生产者压缩消息
网络传输经常使用到的一种优化手段,典型的CPU时间换磁盘空间或IO传输量的方式。
Kafka提供以下几种压缩类型:
- none (默认不压缩)
- gzip
- snappy
- lz4
- zstd
使用 compression.type
参数配置压缩类型:
compression.type=gzip
生产者配置参数
生产者常用配置参数:(生产者所有的配置参数定义在 ProducerConfig 类中)
参数 | 说明 | 备注 |
---|---|---|
acks | 该参数用来指定分区中必须要有多少个副本收到这条消息,生产者才会认为这条消息是成功写入的。 acks 是生产者客户端中一个非常重要参数 ,它涉及消息的可靠性和吞吐量之间的权衡 | |
bootstrap.servers | 配置broke 节点的地址,多个地址之间用逗号隔开 | 只需要配置部分 broke 节点的地址即可,不需要配置所有 broker 节点的地址,因为客户端可以自己发现其他 broker 点的地址,这一过程也属于元数据相关的更新操作 。 |
key.serializer | 消息中 key 对应的序列化类,需要实现org.apache.kafka.common.serialization.Serializer接口 | |
value.serializer | 消息中 value 对应的序列化类,需要实现org.apache.kafka.common.serialization.Serializer接口 | |
batch.size | 批量发送消息的缓存区ProducerBatch 的大小 | 默认值16384(16KB) |
max.request.size | 用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 1MB | 一般情况下,这个默认值就可以满足大多数的应用场景了。并不建议盲目地增大这个值 |
buffer.memory | 生产者客户端中用于缓存消息的缓冲区大小 (默认值33554432,32M) | 如果发送记录的速度超过记录被发送到服务器的速度,那么客户端的消息缓存就会越来越大,达到阀值后,生产者再发送消息将会阻塞,阻塞时间超过 max.block.ms ,则会抛出异常。 |
max.block.ms | 阻塞时间(默认:60000,1分钟) | 控制KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的时间。由于缓冲区已满或元数据不可用,会被阻塞。 |
retries | 配置生产者重试的次数 | |
retry backoff.ms | 用来设定两次重试之间的时间间隔 | |
compression.type | 用来指定消息的压缩方式 | 默认值为“none ”,即默认情况下,消息不会被压缩。 |
connections.max.idle. ns | 参数用来指定在多久之后关闭闲置的连接 | 默认值是 540000 ms ,即 9分钟。 |
linge .ms | 这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入 | 默认值为 0。生产者客户端会在 ProducerBatch 填满或等待时间超过linger ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。 |
partitioner.class | 用来指定分区器,需要实现 Partitioner 接口 | 默认分区器为DefaultPartitioner |
interceptor. classes | 用来设定生产者拦截器,需要实现 Prod cerlnterceptor 接口 | |
metadata.max.age.ms | 如果在这个时间内元数据没有更新的话会被强制更新 | |
enable.idempotence | 是否开启幂等性功能 | |
transactional. id | 设置事务ID,必须唯一 |
消费者 Consumer
订阅(读取)一个或多个主题并处理向它们生成的事件流。
Kafka 中的消费是基于拉模式的,拉模式是消费者主动向服务端发起请求来拉取消息。
创建消费者
简单示例:
public String createConsumer() {
//1.创建消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.2.113.181:9092,10.2.113.182:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 2.订阅主题
consumer.subscribe(Collections.singletonList("customerTopic"));
//3. 批量拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
consumer.commitSync();
}
}
订阅主题、分配分区相关操作:
public class KafkaConsumer<K, V> implements Consumer<K, V> {
/** 订阅主题(集合),可增加消费组重平衡监听 **/
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);
/** 直接分配分区 **/
void assign(Collection<TopicPartition> partitions);
/** 取消使用 subscribe()订阅的主题。这也会清除通过 assign(Collection) 直接分配的任何分区。*/
void unsubscribe();
/**
获取当前分配给此使用者的分区集。 如果订阅是通过使用 assign(Collection) 直接分配分区发生的,那么这将简单地返回已分配 的相同分区。 如果使用了主题订阅,那么这将给出当前分配给消费者的主题分区集(如果尚未发生分配,或者分区正在重新分配,则可能没 有)。*/
Set<TopicPartition> assignment();
}
提交偏移量相关操作:
/** 从服务器订阅的主题中拉取数据(设置超时时间);
如果有可用记录,此方法会立即返回。 否则,将等待通过的超时。如果超时,将返回一个空记录集。 **/
ConsumerRecords<K, V> poll(final Duration timeout);
/** 获取将要获取的下一条记录的偏移量 **/
long position(TopicPartition partition);
/** 手动设置分区的偏移量 **/
void seek(TopicPartition partition, long offset);
/**
获取给定分区的最后提交的偏移量(无论提交是由该进程还是由另一个进程发生的),如果发生故障,返回的偏移量将用作消费者的位置。
此调用将执行远程调用以从服务器获取最新提交的偏移量,并将阻塞直到成功获取提交的偏移量、遇到不可恢复的错误(在这种情况下将其抛出 给调用者)或默认指定的超时。*/
Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions)
/** (同步)提交在最后一次 poll() 上返回的所有订阅的主题和分区列表的偏移量 */
void commitSync();
/** (同步)为指定的主题和分区列表提交指定的偏移量。*/
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);
/** (异步)提交在最后一次 poll() 上返回的所有订阅的主题和分区列表的偏移量;
这是一个异步调用,不会阻塞。 遇到的任何错误都将传递给回调 */
void commitAsync(OffsetCommitCallback callback);
/** (异步)为指定的主题和分区列表提交指定的偏移量。*/
void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
消费者配置参数
消费者配置参数及说明定义在 ConsumerConfig
类中,想要了解全部参数详情查看该类源码即可
参数 | 说明 | 备注 |
---|---|---|
fetch.min.bytes | 该属性指定了消费者从服务器获取记录的最小字节数。 | |
fetch.max.wait.ms | 用于指定 broker 的等待时间,默认是 500 毫秒。如果没有足够的数据流入 kafka 的话,消费者获取的最小数据量要求就得不到满足,最终导致 500 毫秒的延迟。 | |
max.partition.fetch.bytes | 该属性指定了服务器从每个分区里返回给消费者的最大字节数,默认值 1MB | |
session.timeout.ms | 会话过期时间(默认3s) | 如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就会被认定为死亡,协调器就会触发重平衡。把它的分区分配给消费者群组中的其它消费者,此属性与 heartbeat.interval.ms 紧密相关。 |
heartbeat.interval.ms | 指定了消费者向群组协调器发送心跳的频率 | heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一 |
enable.auto.commit | 该属性指定了消费者是否自动提交偏移量,默认值是 true | |
auto.commit.interval.ms | 设置自动提交的频率 | 默认值5秒 |
auto.offset.reset | ||
partition.assignment.strategy | 设置分区分配策略 | 决定哪些分区应该被分配给哪个消费者,Kafka 有两个默认的分配策略Range 和 RoundRobin |
client.id | ||
max.poll.records | 控制单次调用 poll() 方法能够返回的最大记录数量 | |
receive.buffer.bytes 和 send.buffer.bytes | socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。 | 如果它们被设置为 -1,就使用操作系统默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。 |
check.crcs
配置参数:
自动检查所消费记录的 CRC32,这样可以确保消息没有发生在线或磁盘损坏。这个检查增加了一些开销,所以在寻求极端性能的情况下可以考虑禁用此功能。
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group1-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group1
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
sasl.client.callback.handler.class = null
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
注:省略了 ssl 与 sasl 相关的配置参数(安全认证)
消费者组
一个 topic 可以让多个消费者进行消费,一个或多个消费者组成一个消费组( Consumer Group )。消费者组之间、消费者组内部的消费者关系是这样的:
- 在同一个消费者组内,不同的消费者不能消费同一条消息;
- 不同的消费者组可以消费同一条消息;
分区偏移量
偏移量 offset
首先理解为什么会有偏移量这个概念?如果消费者一直处于正常运行状态,那么偏移量基本上没有什么作用。不过,当消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一个提交的偏移量,然后从偏移量指定的地方继续处理。
消费者提交偏移量的本质是消费者往一个名为_consumer_offset
的特殊系统主题里发送消息,消息中包含每个分区的偏移量。消费者提交的偏移量如果处理不当,可能会产生不同的影响:
重复消费
如果提交的偏移量小于客户端处理的最后一个消息的偏移量 LastOffset,那么处于两个偏移量之间的消息就会被重复处理。
消息丢失
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
KafkaConsumer API 提供了多种方式来提交偏移量。
自动提交
最简单的方式就是让消费者自动提交偏移量。如果 enable.auto.commit
被设置为true(默认为 true),那么每隔 auto.commit.interval.ms
时间(默认是 5s),消费者会自动通过 poll()
方法把轮询到的最大偏移量提交上去。
主动提交
把 auto.commit.offset
设置为 false,可以让应用程序决定何时提交偏移量。
同步提交
使用 commitSync()
同步提交偏移量。这个 API 会提交由 poll() 方法返回的最新偏移量,提交后马上返回(如果提交失败就抛出异常)。
异步提交
使用 commitAsync()
异步提交偏移量。
注:异步提交
commitAsync()
与同步提交commitSync()
最大的区别在于异步提交不会进行重试,同步提交会一直进行重试。
组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。
因此,在消费者关闭之前一般会组合使用同步和异步两种方式提交偏移量。
提交特定偏移量
消费者API允许调用 commitSync() 和 commitAsync() 方法时传入希望提交的 partition 和 offset ,即给指定的分区提交特定的偏移量。
分配策略
在Kafka中,消费者组的分区分配策略有以下几种:
- Range(范围):每个消费者按照分区的连续范围分配分区。例如,如果有两个消费者,一个分区有四个分片,则第一个消费者将分配分区0和1,而第二个消费者将分配分区2和3。
- Round Robin(循环):消费者以轮询方式分配分区。例如,如果有两个消费者,一个分区有四个分片,则第一个消费者将分配分区0和2,而第二个消费者将分配分区1和3。
- Sticky(粘性):消费者在重平衡后仍保持其原始分区分配。这意味着如果消费者组中的消费者崩溃并重新启动,则它将尝试获得先前拥有的分区。
- Custom(自定义):可以通过实现
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
接口来定义自己的分区分配策略。
在 Kafka 中,默认的分区分配策略是 Range。如果您不希望使用默认策略,可以在消费者配置中设置 partition.assignment.strategy
属性,以指定使用哪种分配策略。例如,要使用 Round Robin 分配策略,可以将其设置为 org.apache.kafka.clients.consumer.RoundRobinAssignor
。
消费者重平衡
对于Kafka 消费者而言,有一个无法避免的问题就是消费者重平衡 (Rebalance),Rebalance 是让一个消费组的所有消费者就如何消费订阅 topic 的所有分区达成共识的过程,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 的完成。因为要停止消费等待重平衡完成,因此 Rebalance 会严重影响消费端的 TPS,是应当尽量避免的。

Rebalance 发生的情况
以下三种场景会触发Rebalance :
- 消费组的消费者成员数量发生变化
- 消费主题的数量发生变化
- 消费主题的分区数量发生变化
其中后两种情况一般是计划内的,比如为了提高消息吞吐量增加 topic 分区数,这些情况一般是不可避免的,后面我们会重点讨论如何避免因为组内消费者成员数发生变化导致的 Rebalance。
尽量避免消费组 Rebalance
这里涉及两个消费端参数:session.timeout.ms
和 heartbeat.interval.ms
,含义分别是组协调器认为消费组存活的期限,和消费者发送心跳的时间间隔,其中心跳时间间隔默认值是3s,会话过期时间默认 10s。
消费端参数 max.poll.interval.ms
表示 Consumer 两次调用 poll 方法拉取数据的最大时间间隔,默认值 5min,对于那些忙于业务逻辑处理导致超过最大拉取时间间隔的消费者将会离开消费组,此时将发生一次 Rebalance。
此外,如果 Consumer 端频繁 FullGC 也可能会导致消费端长时间停顿,从而引发 Rebalance。
综上所述,只要注意以上三点的配置和监控,便能有效的避免消费组重平衡。
组协调器
正常情况下,每个消费者都会定期向组协调器 (Group Coordinator) 发送心跳,表明自己还在存活,如果消费者不能及时的发送心跳,组协调器会认为该消费者已经下线,就会导致消费者组触发 Rebalance。
对于每一个Consumer Group,Kafka将会从从Broker集群中选择一个Broker作为其Coordinator,组协调器主要做两件事:
- 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。
- 协调Group成员的行为。
数据丢失
出现数据丢失的环节有三个:
- 生产者
- broker 服务器
- 消费者
生产者
问题场景:
- 当出现网络问题时,消息没有发送到broker;
- 没有启用发送到达确认机制
acks
参数,或者设置不合理;
解决方案:
- 启用失败重试机制,设置
retries
参数(重试次数、重试间隔); - 启用发送达到确认机制,设置
acks=1
或者acks=all
; - 客户端发送消息设置回调函数;
服务器
问题场景:
- 数据写入
PageCache
后,还没刷到磁盘,机器宕机;此时会丢失缓存中的数据; - 未设置冗余的副本,没有数据高可靠;
解决方案:
- 使用带后备电源的缓存;
- 副本数设置 3 个:
replication.factor = 3
- 消息写入 2个副本才算发送成功(前提需要
acks=all
) :min.insync.replicas > 2
- 防止不在
ISR
列表中的 Follower 被选举为 Leader :unclean.leader.election.enable=false
注:
如果acks=1或0,则表示完全的异步复制;
如果acks=all 则代表完全的同步复制,而此时如果没有最少同步副本数的限制,那样会牺牲容错性。
min.insync.replicas
参数给了用户做权衡的选择,如副本数设置为3,最少同步副本数为2,这样的设置既有同步复制也有异步复制,保证大多数副本收到写入成功,否则生产者引发异常。
消费者
问题场景:
- 自动提交,消息还没有处理完毕,就自动提交了,如果此时消费者宕机,消息就丢失了;
- 心跳超时,引发重平衡;
- 消息堆积,消费能力低;
解决方案:
- 取消自动提交
auto.commit = false
,改为手动提交。 - 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
监听再平衡
前面说到,如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功,否则可能会出现数据丢失的问题。
Kafka Consumer API 支持在订阅主题是设置消费者重平衡监听器:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public void consumerDemo() {
//消费者订阅主题,同时增加消费者重平衡的监听
consumer.subscribe(Collections.singletonList("customerTopic"), new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
logger.info("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
}
//在监听器中的同步提交当前已处理完的偏移量
class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
在监听器中调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。
Admin API
Admin API 主要提供与Kafka 运维操作相关的程序接口,主要有以下几种操作:
- 主题管理:包括主题的创建、删除和查询。
- 权限管理:包括具体权限的配置与删除。
- 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有 Broker、主题、用户、Client-id 等。
- 副本日志管理:包括副本底层日志路径的变更和详情查询。
- 分区管理:即创建额外的主题分区。
- 消息删除:即删除指定位移之前的分区消息。
- 授权认证管理:包括 Delegation Token 的创建、更新、过期和详情查询。
- 消费者组管理:包括消费者组的查询、位移查询和删除。
- Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。
示例代码:
public void adminApiDemo(){
AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties());
/************* 主题管理 ************/
//创建主题Topic
client.createTopics(newTopics);
//查看指定主题的详情信息
client.describeTopics(Arrays.asList("testTopic"));
//查看指定资源的配置信息(broker,topic)
client.describeConfigs(Arrays.asList(testTopic));
//查看所有主题列表
client.listTopics();
//删除主题
client.deleteTopics(Arrays.asList("testTopic"));
//增加分区数量
adminClient.createPartitions(newPartitions);
//删除消息记录
adminClient.deleteRecords(recordMap);
//查询所有的消费者分组
adminClient.listConsumerGroups();
/************* 偏移量相关操作 ************/
//查询所有主题分区的偏移量信息
adminClient.listOffsets()
//查询所有的消费者分组偏移量
adminClient.listConsumerGroupOffsets()
//改变指定组的偏移量
adminClient.alterConsumerGroupOffsets();
/************* 事务相关操作 ************/
//查询所有的事务信息
adminClient.listTransactions();
//查询具体的事务详情
adminClient.describeTransactions()
//回滚事务
adminClient.abortTransaction()
/************* 权限相关操作 ************/
//创建绑定到指定资源的访问控制列表 (ACL)。
adminClient.createAcls()
//删除权限
adminClient.deleteAcls()
//查看权限
adminClient.describeAcls()
//修改分区副本的底层日志存储目录
adminClient.alterReplicaLogDirs()
//对kafka客户端进行限流
adminClient.alterClientQuotas()
//查看集群中节点的信息
adminClient.describeCluster()
}
Stream API
Kafka Stream是基于Kafka的流式处理类库,用于实时流处理和分析存储在Kafka Broker 中的数据。
用于实现流处理应用程序和微服务的 Kafka Streams API。它提供了更高级别的函数来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个主题读取输入以生成一个或多个主题的输出,有效地将输入流转换为输出流。
Kafka 最初的定位并不是流处理平台,Kafka Stream是Apache Kafka从
0.10
版本引入的一个新特性。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
流计算
首先我们需要掌握一些流处理领域内的关键概念,即流、表以及流表二元性,时间和时间窗口。
流表二元性
首先,我来介绍一下流处理中流和表的概念,以及它们之间的关系。
流就是一个永不停止(理论上是这样的)的事件序列,而表和关系型数据库中的概念类似,是一组行记录。在流处理领域,两者是有机统一的:流在时间维度上聚合之后形成表,表在时间维度上不断更新形成流,这就是所谓的流表二元性(Duality of Streams and Tables)
时间
Kafka支持三类时间:
- 事件发生时间:事件发生的时间,包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定,并且需要Broker或者Topic将
message.timestamp.type
设置为 CreateTime(默认值)才能生效。 - 事件接收时间:也即消息存入Broker的时间。当Broker或Topic将
message.timestamp.type
设置为LogAppendTime时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp属性值设置为当前机器时间。一般消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。 - 事件处理时间。也即Kafka Stream处理消息时的时间。
在实际场景中,**事件处理时间(Processing Time)永远滞后于事件发生时间(Event Time)**,而且滞后程度不稳定,没有规律可寻。所以如果流处理应用要实现结果的正确性,就必须要使用基于 Event Time 的时间窗口,而不能使用基于 Processing Time 的时间窗口。
时间窗口
所谓的时间窗口机制,就是将流数据沿着时间线切分的过程。常见的时间窗口包括:固定时间窗口(Fixed Windows)、滑动时间窗口(Sliding Windows)和会话窗口(Session Windows)。Kafka Streams 同时支持这三类时间窗口。在后面我会详细介绍如何使用 Kafka Streams API 实现时间窗口功能。
流式数据在时间上无界的,但是聚合操作只能作用于特定有界的数据集,这个要如何解决呢?这就有了时间窗口的概念,在时间无界的数据流中定义一个边界来用于计算。Kafka支持的窗口如下:
添加依赖包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.3.0</version>
</dependency>`
Connector API
用于构建和运行可重用的数据导入/导出连接器,这些连接器从外部系统和应用程序消费(读取)或产生(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获对一组表的每次更改。但是,在实践中,您通常不需要实现自己的连接器,因为 Kafka 社区已经提供了数百个即用型连接器。
删除数据
对于传统的消息队列而言,一般会删除已经被消费的消息,而kafka集群会保留所有的消息。但因为磁盘限制,不可能永久保留所有消息,因此kafka提供了两种策略删除数据:
基于日志时间,让kafka删除过期2天或一周的数据;
基于日志大小:让kafka在partition文件超过1GB时删除数据;
基于文件时间删除
基于时间的日志保留策略,默认情况下,每隔一段时间Kafka检查数据是否过期,过期的数据会被自动删除。
# 检查日志段是否可以根据保留策略删除的时间间隔:5分钟
log.retention.check.interval.ms=300000
# 配置数据可以保留多久,默认是 168 个小时(7天)。
log.retention.hours=168
# 执行实际删除操作的间隔时间(默认15秒)
log.cleaner.backoff.ms=15000
# 日志段文件的最大大小。 当达到这个大小时,将创建一个新的日志段。(默认1Gb)
log.segment.bytes=1073741824
删除逻辑:
有一个程序,每5分钟会检查一次是否有日志段文件的“最大时间戳”与当前时间的差值超过了7天,如果有,则将这个日志段文件加上.delete
后缀。
还有一个程序,每15秒会检查一次是否有标记为.delete
后缀的日志段文件,如果有,则执行删除操作。
基于文件大小删除
基于容量的日志保留策略:
# 配置日志文件的最大总容量
log.retention.bytes=1073741824
删除逻辑:
有一个程序,每5分钟统计一次当前路径下所有日志文件大小,如果总大小超过阈值(2GB)的部分多于一个日志段文件的大小(1GB),则将最早的日志段文件加上.delete
后缀。
还有一个程序,每15秒会检查一次是否有标记为.delete
的日志文件,如果有,则执行删除操作。
删除Topic
第一步:设置参数
将 delete.topic.enable
参数设置为true(默认为true),如果该参数值为false,不会被删除。
第二步:停止生产和消费
如果有程序正在生产或者消费该topic,则该topic的offset信息一致会在broker更新。调用kafka delete命令则无法删除该topic。
第三步:执行删除命令
bin/kafka-topics.sh -delete --zookeeper 172.16.10.91:2181,172.16.10.92:2181 --topic logstash-log-dev
日常运维
安装启动
由于kafka是通过zookeeper来调度的,所以,即使是单机kafka也需要启动zookeeper服务,kafka的安装目录下是默认集成了zookeeper,直接启动即可。
ZK模式下启动
# 启动 zookeeper
zookeeper-server-start.bat ..\..\config\zookeeper.properties
# 启动 kafka
kafka-server-start.bat ..\..\config\server.properties
KRaft 模式下启动
官方声明:Kafka 2.8到3.2版本 中的 KRaft 模式仅用于测试,不适用于生产。
config/kraft/server.properties 配置文件
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
#
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#
log.dirs=D:/kafka/logs
管理Topic
使用 kafka-topics.sh 脚本管理,功能:创建、删除、查看或修改topic
参数说明
参数 | 含义 |
---|---|
–list | 列出所有可用的主题 |
–create | 创建一个新的topic |
–topic <String: topic> | 要创建、修改删除的topic名称,除–create选项外还可以接受正则表达式 |
–partitions <Integer: 分区数量> | 正在创建或更改的topic的分区数 |
–replication-factor <Integer:replication factor> | 创建topic每个partition的副本数量 |
–describe | 查看topic详细描述信息 |
–unavailable-partitions | 如果在描述主题时设置,则仅显示其Leader不可用的分区 |
–topics-with-overrides | 如果在描述主题时设置,则仅显示已覆盖配置的主题 |
–alter | 更改主题的分区数、副本分配或配置。 |
–replica-assignment <String:broker_id_for _part1_replica1 :broker_id_for_part1_replica2…> | 创建或修改topic,手动指定partition数,副本数在哪些broker id上 |
–config <String: name=value> | topic 相关配置,详细配置可以使用help命令查看 |
–delete | 删除一个topic |
–delete-config <String: name> | 删除配置 |
–if-exists | 检查topic是否存在,如果存在才会做修改或删除的相关操作 |
–if-not-exists | 检查topic是否不存在,只有在主题不存在时才会进行创建操作 |
–help | 帮助文档 |
创建topic
# 创建topic:指定主题名称、分区数量、副本数量
kafka-topics.bat --bootstrap-server 117.78.38.62:9092 --topic kafka-test-topic --create --partitions 1 --replication-factor 1
查看某个Topic的详情信息
>kafka-topics.bat --bootstrap-server 117.78.38.62:9092 --topic iot.ruleHandle --describe
Topic: iot.ruleHandle TopicId: TJGZnNxwTD68ZsFgThqkXw PartitionCount: 5 ReplicationFactor: 1 Configs:
Topic: iot.ruleHandle Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: iot.ruleHandle Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: iot.ruleHandle Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: iot.ruleHandle Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: iot.ruleHandle Partition: 4 Leader: 0 Replicas: 0 Isr: 0
生产消息
使用 kafka-console-producer.sh 脚本执行
# producer生产消息
kafka-console-producer.bat --broker-list 117.78.38.62:9092 --topic iot.pushMessage
# producer生产消息(附带key)
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicName --property parse.key=true
默认消息键与消息值间使用“Tab键”进行分隔,切勿使用转义字符(\t),如下所示:
>Lei Li Hello Kafka!
>Meimei Han 你好 kafka!
消费消息
使用 kafka-console-consumer.sh 脚本执行
# 消费消息
kafka-console-consumer.bat --bootstrap-server 117.78.38.62:9092 --topic iot.pushMessage
# 消费消息(显示key)
kafka-console-consumer.bat --bootstrap-server 117.78.38.62:9092 --topic iot.pushMessage --property print.key=true
# 消费消息,指定消费者组
kafka-console-consumer.bat --bootstrap-server 117.78.38.62:9092 --topic iot.pushMessage --group group-2
消费者组管理
–list, –describe, –delete, –reset-offsets, –delete-offsets
# 查看消费者组列表
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --list
# 查看所有消费者组详情
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --describe --all-groups
# 查看指定消费组详情
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --describe --group group3
# 查看所有消费组成员详情
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --describe --all-groups --members
# 查看指定消费组成员详情
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --describe --group group3 --members
# 删除指定消费组:删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报异常。
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --group group3 --delete
# 重置消费组的偏移量 --reset-offsets
查看消费者组详情执行结果:
- GROUP 消费者组
- TOPIC 主题
- PARTITION 分区
- CURRENT-OFFSET 当前偏移量
- LOG-END-OFFSET 日志末尾偏移量
- LAG 延迟偏移量 = 日志末尾偏移量 - 当前偏移量
- CONSUMER-ID 消费者ID
示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
iot-group iot.ruleHandle 3 4158 4158 0 consumer-iot-group-4-bf6f3d01-9487-4a73-a2eb-d5a3a0d357f4 /117.78.38.62 consumer-iot-group-4
iot-group iot.ruleHandle 2 4242 4242 0 consumer-iot-group-3-2cab6d61-25e0-4e69-99f0-cf87b246fd98 /117.78.38.62 consumer-iot-group-3
D:\apache-kafka\kafka_2.13-3.2.0\bin\windows>kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --describe --group group3 --members
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
group3 console-consumer-088418a1-c6ab-40f5-ad46-26968c8cd39e /117.119.74.94 console-consumer 1
重置消费者组偏移量
能够执行成功的一个前提是 消费组这会是不可用状态。
下面的示例使用的参数是: --dry-run
这个参数表示预执行,会打印出来将要处理的结果,等你想真正执行的时候请换成参数--excute
;
下面示例 重置模式都是 –to-earliest 重置到最早的,请根据需要参考下面 相关重置Offset的模式 换成其他模式;
# 重置指定消费组的所有Topic的偏移量--all-topic
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --group group3 --all-topic --reset-offsets --to-earliest --dry-run
# 重置指定消费组的指定Topic的偏移量--topic
kafka-consumer-groups.bat --bootstrap-server 117.78.38.62:9092 --group group3 --topic iot.pushMessage --reset-offsets --to-earliest --dry-run
Offset的重置模式
重置模式 | 描述 | 例子 |
---|---|---|
–to-earliest | 重置offset到最开始的那条offset(找到还未被删除最早的那个offset) | |
–to-current | 直接重置offset到当前的offset,也就是LOE | |
–to-latest | 重置到最后一个offset | |
–to-datetime | 重置到指定时间的offset,格式为:YYYY-MM-DDTHH:mm:SS.sss | –to-datetime “2021-6-26T00:00:00.000” |
客户端性能测试
Apache Kafka 官方提供了两个客户端性能测试脚本,它们的存放位置如下:
- 生产者性能测试脚本:$KAFKA_HOME/bin/kafka-producer-perf-test.sh
- 消费者性能测试脚本:$KAFKA_HOME/bin/kafka-consumer-perf-test.sh
生产者性能测试脚本支持测试的性能指标包括:吞吐量(throughput)、最大时延(max-latency)、平均时延(avg-latency);
消费者性能测试脚本同样支持吞吐量指标,还提供了一些消费端特有的指标
Broker配置参数
常用配置
############################# 服务器基本配置 #############################
# broker ID 必须设置一个唯一的整数
broker.id=0
# 服务器用于接收来自网络的请求并向网络发送响应的线程数
num.network.threads=3
# 服务器用于处理请求的线程数,可能包括磁盘 I/O
num.io.threads=8
# 套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=102400
# 套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400
# 套接字服务器将接受的请求的最大大小(防止OOM)
socket.request.max.bytes=104857600
############################# 日志基础配置 #############################
# 存储日志文件的位置(以逗号分隔的目录列表)
log.dirs=/root/kafka/logs1,/root/kafka/logs2
# 每个主题的默认日志分区数。 更多的分区允许更大的并行消费,但这也会导致跨代理的文件增多。
num.partitions=1
# 每个数据目录的线程数,用于在启动时恢复日志并在关闭时刷新。对于数据目录位于 RAID 阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1
############################# 日志刷新策略 #############################
# 在强制将数据刷新到磁盘之前要接受的消息数
log.flush.interval.messages=10000
# 在我们强制刷新之前,一条消息可以在日志中停留的最长时间
log.flush.interval.ms=1000
############################# 日志保留策略 #############################
# 检查日志段是否可以根据保留策略删除的时间间隔;5分钟
log.retention.check.interval.ms=300000
# 配置数据可以保留多久,默认是 168 个小时,也就是一周。
log.retention.hours=168
# 日志段文件的最大大小。 当达到这个大小时,将创建一个新的日志段。(默认1Gb)
log.segment.bytes=1073741824
############################# 内部主题设置 #############################
# 设置组元数据内部主题“__consumer_offsets”和“__transaction_state”的复制因子,开发环境建议使用大于1的值以确保可用性,例如3(对于开发测试可以设置为1)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
############################# Zookeeper 配置 #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
############################# 组协调器配置 #############################
# 设置组协调器GroupCoordinator 延迟初始消费者重新平衡的时间(单位:毫秒),最大为 max.poll.interval.ms
# 设置为0可以方便开发和测试,但在生产环境中,默认值 3 秒更合适,因为这将有助于避免在应用程序启动期间不必要且可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=3000
日志刷新策略
默认情况下我们只使用 fsync() 来延迟同步 OS 缓存。 以下配置控制数据强制刷新到磁盘。
这里有一些重要的权衡:
1.持久性:如果不使用复制,未刷新的数据可能会丢失。
2.延迟:非常大的刷新间隔可能会在刷新确实发生时导致延迟峰值,因为将有大量数据要刷新。
3.吞吐量:flush一般是最昂贵的操作,小的flush间隔可能会导致过多的seek。
下面的设置允许配置刷新策略在一段时间或每 N 条消息(或两者)后刷新数据。 这可以在全局范围内完成并在每个主题的基础上覆盖。
# 在强制将数据刷新到磁盘之前要接受的消息数
log.flush.interval.messages=10000
# 在我们强制刷新之前,一条消息可以在日志中停留的最长时间
log.flush.interval.ms=1000
集群
Kafka ZK模式
注:截止到Kafka3.2 版本(2022年),Kafka的生产稳定版依然完全依赖 ZooKeeper 进行分布式协调工作,新版本推出的Raft 元数据模式只是实验功能,并不能用于生产环境。
Apache Kafka 依赖 ZooKeeper 完成以下工作:
控制器选举
控制器是 Kafka 生态系统中最重要的代理实体之一,它还负责维护跨所有分区的领导-从属关系。如果某个节点由于某种原因正在关闭,则控制器有责任告诉所有副本充当分区领导者,以便在即将失败的节点上履行分区领导者的职责。因此,每当一个节点关闭时,都可以选出一个新的控制器,并且还可以确保在任何给定时间,只有一个控制器,并且所有的跟随节点都同意这一点。
主题配置
有关所有主题的配置,包括现有主题列表、每个主题的分区数量、所有副本的位置、所有主题的配置覆盖列表以及哪个节点是首选领导者等。
访问控制列表
所有主题的访问控制列表或 ACL 也在 Zookeeper 中维护。
集群成员
Zookeeper 还维护一个在任何给定时刻运行的所有代理的列表,这些代理是集群的一部分 [9]。
Kafka启动后会在Zookeeper的根路径 /
下初始化以下节点数据:
- admin
- brokers
- cluster
- config
- consumers
- controller_epoch
- feature
- isr_change_notification
- latest_producer_id_block
- log_dir_event_notification
Kafka Raft模式
2.8.0版本之前,ZooKeeper 在 Kafka 中扮演着重要的角色,用来存储 Kafka 的元数据。ZooKeeper 存储着 Partition 和 Broker 的元数据 ,同时也负责 Kafka Controller 的选举工作。
对于 Kafka 来讲,ZooKeeper 是一套外部系统,要想部署一套 Kafka 集群,就要同时部署、管理、监控 ZooKeeper。这无疑是提高了复杂性。从2.8.0版本起,Kafka 可以脱离ZooKeeper 来部署集群了,我们称之为【Kafka Raft 元数据模式】。
控制器 Controller
控制器(Controller)是 Kafka 的核心组件,主要作用是在 Zookeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一个 broker 都能充当控制器的角色,但在运行过程中只能有一个 broker 成为控制器。
Controller Broker
在分布式系统中,通常需要有一个协调者在分布式系统发生异常时发挥特殊的作用。在Kafka中该协调者称之为控制器(Controller),其实该控制器并没有什么特殊之处,它本身也是一个普通的Broker,只不过需要负责一些额外的工作。
Controller 选择策略
上面说每台 Broker 都有充当控制器的可能性。那么,控制器是如何被选出来的呢?
实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
Controller 作用
- 集群成员管理(新增Broker,Broker停机、宕机)
- 主题管理(创建、删除主题)
- Preferred 领导者选举
- 分区重分配
- 数据服务
故障转移
处理脑裂
Zookeeper中有一个与控制器有关的 /controller_epoch
节点,该节点是持久节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。
controller_epoch 的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。
元数据 Metadata
元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 lead 副本、follower 副本分配在哪个节点上, 哪些副本在 AR、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超 metadata max.age.ms
时间没有更新元数据,都会引起元数据的更新操作 。客户端参数 metadata max.age.ms
的默认值为 300000 ,即 5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode
, 然后 这个Node 发送 MetadataRequest 请求来获取具体的元数据信息。
leastLoadedNode
是集群中所有 Node 中负载最小的那个节点。这里的负载最小是通过每个节点中还未确认的请求决定的,未确认的请求越多则认为负载越大,反之越小。选择 leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。 leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
性能监控
Kafka 作为一个非常重要的产品,已经在很多互联网企业里被作为关键组件部署了。而 Kafka 的性能监控也早就是一个非常重要的问题,Kafka 本身并不自带性能监控平台,很多公司比如雅虎自己内部开发了这样的系统。
CMAK 集群管理器
Kafka 集群管理器(原名:Kafka Manager,雅虎开源)
GitHub开源地址:https://github.com/yahoo/CMAK/
Confluent Enterprise
Confluent 开发的控制平台无疑应该是最可靠的,毕竟没有人比 Kafka 的开发者更了解自己的产品。可惜这个是收费产品,而且不开源。Confluent Enterprise 同时还自带了数据自动负载平衡和跨数据中心数据复制的能力。
高级特性
分区 Partition
kafka的每个topic都可以创建多个partition,partition的数量无上限,并不会像replica一样受限于broker的数量,因此partition的数量可以随心所欲的设置。那确定partition的数量就需要思考一些权衡因素。
分区副本 Replication
Kafka 的分区副本机制(Replication)是Kafka 实现高可靠、高可用的基础。

副本类型
Kafka 中有 leader 和 follower 两类副本,一个分区,leader 只有一个,follower 可以有多个。
分区副本数量由 broker 端参数 default.replication.factor
控制,默认值为 1,即只有一个leader 副本,没有follower 副本。Kafka 的生产环境下建议设置为 3 个副本,最多不要超过5个。
副本作用
leader 副本对外提供读写服务,follower 副本的主要工作就是从 leader 副本异步拉取消息,进行消息数据的同步,并不对外提供读写服务。
Kafka 之所以这样设计,主要是为了保证读写一致性,因为副本同步是一个异步的过程,如果当 follower 副本还没完全和 leader 同步时,从 follower 副本读取数据可能会读不到最新的消息。
副本主要作用:
- 消息冗余存储,提高 Kafka 数据的可靠性;
- 提高 Kafka 服务的可用性,follower 副本能够在 leader 副本挂掉或者 broker 宕机的时候参与 leader 选举,继续对外提供读写服务。
分区重分配
ISR 同步副本集合
为了维护分区副本的同步,引入 ISR(In-Sync Replicas)副本集合的概念,ISR 是分区中正在与 leader 副本进行同步的 replica 列表,且必定包含 leader 副本。
ISR 列表是持久化在 Zookeeper 中的,任何在 ISR 列表中的副本都有资格参与 leader 选举。
ISR 列表是动态变化的,并不是所有的分区副本都在 ISR 列表中,哪些副本会被包含在 ISR 列表中呢?副本被包含在 ISR 列表中的条件是由 replica.lag.time.max.ms
参数控制的,参数含义是副本同步落后于 leader 的最大时间间隔,默认10s,意思就是说如果某一 follower 副本中的消息比 leader 延时超过10s,就会被从 ISR 中排除。
Kafka 之所以这样设计,主要是为了减少消息丢失,只有与 leader 副本进行实时同步的 follower 副本才有资格参与 leader 选举,这里指相对实时。
unclean leader 选举
既然 ISR 是动态变化的,所以 ISR 列表就有为空的时候,ISR 为空说明 leader 副本也“挂掉”了,此时 Kafka 就要重新选举出新的 leader。但 ISR 为空,怎么进行 leader 选举呢?
Kafka 把不在 ISR 列表中的存活副本称为“非同步副本”,这些副本中的消息远远落后于 leader,如果选举这种副本作为 leader 的话就可能造成数据丢失。Kafka broker 端提供了一个参数 unclean.leader.election.enable
,用于控制是否允许非同步副本参与 leader 选举;如果开启,则当 ISR 为空时就会从这些副本中选举新的 leader,这个过程称为 Unclean leader 选举。
如果开启 Unclean leader 选举,可能会造成数据丢失,但保证了始终有一个 leader 副本对外提供服务;保证可用性,失去一致性;如果禁用 Unclean leader 选举,就会避免数据丢失,但这时分区就会不可用。保证一致性,失去可用性。这就是典型的 CAP 理论,即一个系统不可能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)。所以在这个问题上,Kafka 赋予了我们选择 CP 或 AP 的权利。
可以根据实际的业务场景选择是否开启 unclean leader选举,这里建议关闭 unclean leader 选举,因为通常数据的一致性要比可用性重要的多。
数据同步过程
Kafka 使用高水位(high watermark,HW)和日志结束偏移量(log end offset,LEO)两个记录位置的变量来处理数据同步过程。

当生产者给当前分区推送7条数据(假设一条数据占位一个偏移量),其中1-5条数据在所有follower 副本中都存在(即已同步完成),而6-7 两条数据还没有完全复制完,此时的高水位HW=5,表示所有的副本已完成同步到5这个偏移量。
LEO 则记录了生产者下次要写入的位置。每个分区副本都会维护 HW和LEO 两个变量值,并且Leader 副本额外会冗余一份所有相关联 follower 副本的 LEO值,方便计算HW值。

假设这样的场景,当前分区没有被写入数据,HW=LEO=0,此时生产者推送两条数据到Leader(假设总偏移量为2),Leader 将数据写入PageCache,将自己的LEO更改为2后,此时follower 副本开始发送 fetch 请求,同步数据:

上面的流程是两次请求和响应,对应两阶段提交。第一次fetch请求,follower 会告诉Leader要从指定偏移量的位置开始拉取数据,同时Leader响应时除了返回数据,还携带着当前分区的HW值(Leader的HW值就代表了当前分区的HW值),可以看到follower 必须经过两次请求才可以更新自己的HW。Leader 副本高水位更新和 Follower 副本的高水位更新是存在一些时间偏差的。
follower 副本的日志截断
上面讲述的是正常情况下的复制流程,假设下面这种场景:
消息会先记录到 leader,follower 再从 leader 中拉取消息进行同步,这就导致同一时刻,leader LEO 通常会比 follower 的要大(follower 之间的 offset 也不尽相同,虽然最终会一致,但过程中会有差异),假设此时出现 leader 切换,有可能选举了一个 LEO 较小的 follower 成为新的 leader,这时该副本的 LEO 就会成为新的标准,这就会导致其他 follower LEO 值有可能会比 leader LEO 值要大的情况,因此 follower 在进行同步之前,需要从 leader 获取分区的LastOffset 值(分区的最后提交偏移量),如果 LastOffset 小于 当前 LEO,则需要进行日志截断,然后再从 leader 拉取数据实现同步。
日志截断只会发生在follower 副本身上。
副本之间的数据同步也可能出现问题:数据丢失问题和数据不一致问题。 KIP-101中列举了两个场景,描述了在没有提出leader epoch的情况下,会出现两种数据不一致的问题,下面逐一分析这两个场景。
场景1-数据丢失

副本B为Leader,副本A为Follower,当2.2流程执行完,2.3还没执行的时刻,副本A重启,重启后,Follower要根据HW进行日志截断,把偏移量为1的数据删除,此时副本B宕机,副本A成为新的Leader,副本B恢复后成为Follower,同样也会进行日志截断,将偏移量为1的数据删除,此时副本A和B均丢失了数据m2。
问题在哪里?在于A重启之后以HW为标准截断了多余的日志。不截断行不行?不行,因为这个日志可能没被提交过(也就是没有被ISRs中的所有节点写入过),如果保留会导致日志错乱。
场景2-数据不一致

当我们有两个broker,Leader 和follower 均成功写入m1 消息的情况下,Leader 接收到生产者发送的m2消息并写入,follower 发送fetch请求,拿到数据后写入到页缓存(还没有刷盘),此时两个broker同时停电宕机,在这种情况下,如果丢失 n 个副本,那么丢失数据是可以接受的(Kafka 保证 n-1 个副本的持久性,如果某分区的ISR列表中空了,是不保证数据一定持久化的)。不幸的是,不同机器上的日志有可能出现分歧,当日志量少的副本B成为Leader后,继续接收生产者的消息m3,此时副本A为Follower,两者的HW值一致,不需要进行日志截断,但此时A和B的日志数据已经不一致了。
根本问题是消息被异步刷新到磁盘。这意味着,在崩溃之后,机器上的数据可能会丢失。当它们恢复时,任何broker 都可能成为Leader。如果Leader恰好是日志消息最少的机器,我们将丢失数据。虽然这在系统的持久性合约内,但问题是副本之间的日志可能是错乱的。
Leader Epoch 机制
为了解决上面提出的两个场景存在的问题,我们可以分析下产生这两个场景的原因是否有什么共性。
场景一提到因为follower的HW更新有延时,所以错误的截断了已经提交了的日志。场景二提到因为异步刷盘的策略,全崩溃的情况下选出的leader并不一定包含所有已提交的日志,而follower还是以HW为准,错误的判断了自身日志的合法性。所以,不论是场景一还是场景二,根本原因是follower的HW是不可靠的。
其实,如果熟悉raft的话,应该已经发现上面分析的场景和raft中的日志恢复很类似,raft中的follower是可能和leader的日志不一致的,这个时候会以leader的日志为准进行日志恢复。而raft中的日志恢复很重要的一点是follower根据leader任期号进行日志比对,快速进行日志恢复,follower需要判断新旧leader的日志,以最新leader的数据为准。
这里的leader epoch和raft中的任期号的概念很类似,每次重新选择leader的时候,用一个严格单调递增的id来标志,可以让所有follower意识到leader的变化。而follower也不再以HW为准,每次奔溃重启后都需要去leader那边确认下当前leader的日志是从哪个offset开始的。
KIP-101引入如下概念:
- Leader Epoch: leader纪元,单调递增的int值,每条消息都需要存储所属哪个纪元。
- Leader Epoch Start Offset: 新leader的第一个日志偏移,同时也标志了旧leader的最后一个日志偏移。
- Leader Epoch Sequence File: 每个节点都需要存储 Leader Epoch以及Leader Epoch Start Offset的变化记录。
- Leader Epoch Request: 由follower请求leader,得到请求纪元的最大的偏移量。如果请求的纪元就是当前leader的纪元的话,leader会返回自己的LEO,否则返回下一个纪元的Leader Epoch Start Offset。follow会用此请求返回的偏移量来截断日志。
我们再看下引入Leader Epoch之后是如何解决上面的两个场景的。
场景一:

副本A重启后作为follower,不是忙着以HW为准截断自己的日志,而是先发起LeaderEpochRequest询问副本B第0代的最新的偏移量是多少,副本B会返回自己的LEO为2给副本A,A此时就知道消息m2不能被截断,所以m2得到了保留。当A选为leader的时候就保留了所有已提交的日志,日志丢失的问题得到解决。
场景二:

副本A重启作为follower的第一步还是需要发起LeaderEpochRequest 询问leader当前第0代最新的偏移量是多少,由于副本B已经经过换代,所以会返回给A第1代的起始偏移(也就是1),A发现冲突后会截断自己偏移量为1的日志,并重新开始和leader同步。副本A和副本B的日志达到了一致,解决了日志错乱。
注:Leader Epoch 机制是在0.11 版本新加入的完善功能。每个分区文件夹下都有一个
leader-epoch-checkpoint
文件,里面记录了每一代Leader 的第一个偏移量。
副本同步参数
参数 | ||
---|---|---|
replica.lag.time.max.ms | follower副本与 leader 的最大延迟时间,超时后从ISR列表中移除 | 默认30s |
replica.fetch.min.bytes | 每个获取响应的最小字节数。 如果没有足够的字节,则等待 replica.fetch.wait.max.ms 时间 | |
replica.fetch.wait.max.ms | follower副本发出的每个 fetcher 请求的最大等待时间。 该值应始终小于replica.lag.time.max.ms,以防止ISR对低吞吐量的主题频繁收缩; | 默认500ms |
replica.high.watermark.checkpoint.interval.ms | 将高水位保存到磁盘的频率 | 默认5s |
生产者消息缓存设计
Kafka 生产者发送消息采用批量发送的设计,Producer在发送消息的时候,会将消息放到一个ProducerBatch中,这个Batch可能包含多条消息,然后再将Batch打包发送。
注:批量发送消息的相关配置参数有 batch.size、linger.ms ,相关的类有 RecordAccumulator、ProducerBatch 等。
batch.size
设置要批量发送的大小上限,如果批量缓存中的数据少于上限,则会等待 linger.ms
时间以累计更多的消息,消息大小达到上限或时间超时后,都会发送消息。如果 linger.ms
参数设置为0,意味着即使消息大小低于上限也会立即发送消息。
linger.ms
批量消息缓存延迟时间。

这样做的好处就是能够减少发起请求的次数,提高系统吞吐量。
但这样设计同时也带来一个问题,就是必须等到ProducerBatch 缓存塞满(达到batch.size
值)或者延迟时间到了(达到linger.ms
值),才会发送。如果生产的消息比较少的话,迟迟难以让Batch塞满,那么就意味着更高的延迟。
在不指定key和分区位置的情况下,如果将消息轮询到各个分区,本来消息就少,此时如果还遍历所有分区分配,那么每个ProducerBatch 就更难满足发送的条件了。
所以这里设想一下,假如我们优先让一个ProducerBatch 塞满,再给其他的分区分配是不是可以降低这个延迟呢?
粘性分区策略
粘性分区策略(Sticky Partitioner),简单点解释,就是在无key、无分区指定的情况下发送消息,消息会尽量分配到和上次一致的分区(和上次同主题),这样做的好处是解决了上面较高延迟的情况,一个ProducerBatch 会被优先塞满。
让我们看一个例子:Topic1 有3个分区, 此时给Topic1 发9条无key、无分区指定的消息, 这9条消息加起来都不超过batch.size
,看有无粘性分区有什么区别。


可以看到,使用粘性分区策略之后,至少是先把其中一个Batch填满了发送,然后再去填充另一个Batch。不至于向之前那样,虽然平均分配,但是导致一个Batch都没有放满,不能及时发送,造成较高延迟。
幂等性和事务
再说一下Kafka 消息交付的可靠性保障,所谓的消息交付可靠性保障,是指 Kafka 对 生产者和 消费者要处理的消息提供什么样的承诺。常见的承诺有以下三种:
- 最多一次(at most once):消息不会重复发送,但可能会丢失。
- 至少一次(at least once):消息不会丢失,但可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
Kafka 默认提供的交付可靠性保障是第二种,即至少一次。即只有 生产者成功“提交”消息且接到 Broker 的应答才会认为该消息成功发送。不过如果消息成功“提交”,但 Broker 的应答没有成功发送回 生产者(比如网络出现瞬时抖动),那么生产者就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。
Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,此时使用最多一次交付保障就是恰当的。
那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。它们分别是什么机制?两者是一回事吗?要回答这些问题,我们首先来说说什么是幂等性。
幂等性 Idempotence
幂等性是指某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,因为它们不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。
指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,enable.idempotence
参数被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
幂等性的作用范围
Producer 幂等性使用起来很简单,但在使用之前必须要了解幂等性的作用范围。首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
那么如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务。这也是幂等性 Producer 和事务型 Producer 的最大区别!
事务 Transaction
Kafka 的事务概念类似于关系型数据库提供的事务。在数据库领域,事务提供 ACID 保证,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。而在实际场景中各家数据库对 ACID 的实现又各不相同。
Kafka 自 0.11 版本开始也提供了对事务的支持,它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
Kafka事务能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性一样,开启 enable.idempotence = true。
- 设置生产者端参数
transctional.id
,最好为其设置一个有意义的名字。
此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level
参数的值即可。当前这个参数有两个取值:
read_uncommitted
:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
小结:
简单来说,幂等性和事务型都是 Kafka 社区力图为实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。
不过,切记万事都有利弊。比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。
数据存储与检索
一个主题 topic 可以有多个分区partition,每 个分区副本都对应一个 Log ,而 Log 又可以分为多个日志分段,这样也每个分区下由多个日志段文件segement 组成。使用Segement的好处能够保证单个的文件不会很大,方便删除,同时也避免了单个日志文件无限增大。
Segment 日志段文件
segment文件由两部分组成,分别为 .log
日志文件和 .index
索引文件, Segment 的大小可以通过log.segment.bytes
参数进行配置(默认为1Gb)。
日志段文件及相关文件的命令规则为:分区副本的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值(偏移量),大小为64位,20位数字字符长度,没有数字用0填充。
00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000119.log
00000000000000000119.index
00000000000000000119.timeindex
00000000000000000119.snapshot
leader-epoch-checkpoint
上面的文件一共分为5类,index
、log
、timeidex
、snapshot
和leader-epoch-checkpoint
。
- Log:是真正的消息内容
- Index: index文件是消息的物理地址的索引文件。
- timeindex:它是映射时间戳和相对offset
- snapshot: 记录了producer的事务信息
- leader-epoch-checkpoint: 保存了每一任leader开始写入消息时的offset, 会定时更新;
Message物理结构

参数说明:
关键字 | 解释说明 |
---|---|
8 byte offset | 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset), 它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校验message |
1 byte “magic” | 表示本次发布Kafka服务程序协议版本号 |
1 byte “attributes” | 表示为独立版本、或标识压缩类型、或编码类型。 |
4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填 |
K byte key | 可选 |
value bytes payload | 表示实际消息数据。 |
根据索引查询消息
索引机制
在kafka中,每个日志分段文件都对应了两个索引文件——.inde
偏移量索引文件和 .timeindex
时间戳索引文件,主要用来提高查找消息的效率。
- 偏移量索引文件:用来建立消息偏移量 offset 到物理地址 position 之间的映射关系,方便快速定位消息所在的物理文件位置。
- 时间戳索引文件:则根据指定的时间戳 timestamp 来查找对应的偏移量 offset 信息。
Kafka 中的索引文件,以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes
指定,默认值为4KB)的消息时,偏移量索引文件 和 时间戳索引文件 分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes
的值,对应地可以缩小或增加索引项的密度。
稀疏索引通过 MappedByteBuffer
将索引文件映射到内存中,以加快索引的查询速度。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量(未命中索引项的情况)。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
稀疏索引的方式是考虑磁盘空间、内存空间、查找时间等多方面因素的一个折中方案,它减少索引文件大小,节省了磁盘空间,同时也适当增大了查找时间。
下面以偏移量索引文件来做具体分析。偏移量索引的格式如下图所示:
Index每一个索引项为8字节,其中相对offset占用4字节,消息的物理地址(position)占用4个字节

偏移量索引文件与日志文件的对应关系:

查找逻辑:
- 通过日志段文件名先找到 offset=14 的 message 所在的 segment文件(利用二分法查找)
- 找到的 segment 对应的偏移量索引文件,用查找的offset 减去.index文件名的offset,也就是00000.index文件,我们要查找的offset为3的message在该.index文件内的索引为14,正好命中索引,返回索引项14对应的物理地址。(如果查找的偏移量为15,此时没有命中索引,则选择小于指定偏移量的最大索引项,即还是索引项14对应的物理地址)
- 根据物理偏移地址,去.log文件中找相应的Message (没有命中的索引需要做一次顺序扫描,但是扫描的范围很小)
为什么这么快
Kafka是一个高性能的分布式系统,为什么这么快?主要有以下几点:
- 顺序读写
- 零拷贝
- 消息压缩
- 分批发送
持久化
注:以下描述大部分来自官方文档(官网已经解释的很清楚了)
Kafka 严重依赖文件系统来存储和缓存消息。人们普遍认为“磁盘很慢”,这使人们怀疑持久结构能否提供具有竞争力的性能。事实上,磁盘比人们预期的要慢得多,也快得多,这取决于它们的使用方式。一个设计合理的磁盘结构通常可以和网络一样快。
关于磁盘性能的关键事实是,在过去十年中,硬盘驱动器的吞吐量一直被磁盘寻道的延迟所影响。因此,在具有六个 7200rpm SATA RAID-5 阵列的 JBOD 配置上线性写入的性能约为 600MB/秒,但随机写入的性能仅为约 100k/秒——相差超过 6000 倍。这些线性读写是所有使用模式中最可预测的,并且由操作系统进行了大量优化。现代操作系统提供预读和后写技术,以大块倍数预取数据,并将较小的逻辑写入分组为大型物理写入。他们实际上发现顺序磁盘访问在某些情况下可能比随机内存访问更快!
为了弥补这种性能差异,现代操作系统越来越积极地使用主内存进行磁盘缓存。现代操作系统很乐意将所有空闲内存转移到磁盘缓存中,而在回收内存时几乎没有性能损失。所有的磁盘读写都会经过这个统一的缓存。如果不使用直接 I/O,则无法轻松关闭此功能,因此即使进程维护数据的进程内缓存,此数据也可能会在 OS 页面缓存中复制,从而将所有内容存储两次。
此外,我们是在 JVM 之上构建的,任何花时间研究 Java 内存使用的人都知道两件事:
- 对象的内存开销非常高,通常会使存储的数据大小翻倍(或更糟)。
- 随着堆内数据的增加,Java 垃圾收集变得越来越繁琐和缓慢。
由于这些因素,使用文件系统和依赖页面缓存优于维护内存中缓存或其他结构——我们通过自动访问所有空闲内存至少使可用缓存翻倍,并且可能通过存储紧凑型缓存(压缩)再次翻倍字节结构而不是单个对象。这样做会在 32GB 的机器上产生高达 28-30GB 的缓存,而不会受到 GC 的影响。此外,即使服务重新启动,此缓存也会保持温暖,而进程内缓存需要在内存中重建(对于 10GB 缓存可能需要 10 分钟),否则它需要从完全冷的缓存开始(这可能意味着糟糕的初始性能)。这也大大简化了代码,因为用于维护缓存和文件系统之间一致性的所有逻辑现在都在操作系统中,这往往比一次性的进程内尝试更有效、更正确。如果您的磁盘使用倾向于线性读取,那么预读实际上是在每次磁盘读取时使用有用数据预先填充此缓存。
这表明了一种非常简单的设计:与其在内存中维护尽可能多的内容,并在空间不足时将其全部刷新到文件系统中,不如将其反转。所有数据都会立即写入文件系统上的持久日志,而不必刷新到磁盘。实际上这只是意味着它被转移到内核的页面缓存中。
注:这种以页面缓存为中心的设计风格在一篇关于 Varnish 设计的文章中进行了描述。
总结:
官网的这段话说明了Kafka的持久化设计,利用了内核的页面缓存,而不是进程内存;利用了磁盘的顺序读写,而不是随机读写;而且支持存储被压缩后的数据,不仅节省了服务器的磁盘空间、也减少了服务器与客户端的网络带宽,这样的设计决定了Kafka的极致性能表现。
Sendfile技术
Sendfile是一种在计算机操作系统中进行网络数据传输的优化技术,它允许将文件从磁盘读取到内核空间缓冲区中,然后将缓冲区中的数据直接传输到网络套接字中,从而避免了在用户空间和内核空间之间进行数据复制的开销,减少了数据传输的延迟和CPU使用率。
在Unix/Linux操作系统中,sendfile()系统调用实现了Sendfile技术。它的参数包括源文件描述符、目标套接字描述符、发送的字节数以及可选的起始位置。调用sendfile()会将源文件中的数据直接传输到目标套接字,而无需将数据从内核缓冲区复制到用户空间。
Sendfile技术在高性能网络应用程序中得到广泛的应用,比如Web服务器、视频流媒体服务器等,它可以提高数据传输的效率,从而提升整个应用程序的性能。
mmap 技术
mmap是一种在计算机操作系统中实现内存映射的技术,它将一个文件或者设备映射到进程的地址空间中,从而使得进程可以像访问普通内存一样来读写文件或者设备,而无需进行繁琐的文件I/O或设备I/O操作。
在Unix/Linux操作系统中,mmap()系统调用实现了内存映射技术。调用mmap()会在进程的地址空间中创建一个新的映射区域,并将指定的文件或者设备映射到该区域中。进程可以使用指针来访问这个映射区域,就像访问普通内存一样。当进程对映射区域进行读写操作时,操作系统会将这些操作直接转换为对文件或者设备的读写操作。
mmap技术有以下优点:
- 避免了繁琐的文件I/O或设备I/O操作,提高了数据访问的效率;
- 省去了操作系统在用户空间和内核空间之间进行数据复制的开销;
- 简化了文件或者设备的读写操作,使得代码更加简洁易读。
mmap技术在一些高性能应用程序中得到广泛的应用,比如数据库系统、搜索引擎、图像处理等。它可以大大提高数据访问的效率,从而提升整个应用程序的性能。
数据安全
Kafka支持基于SSL和基于SASL的安全认证机制,你可以使用SSL做通信加密,使用SASL做认证实现。
客户端认证
SASL下又细分多种认证机制:
- GSSAPI
- PLAIN
- SCRAM
- OAUTH-BEARER
- Delegation Token
授权
实践
Kafka在美团数据平台的实践
Kafka在美团的集群规模总体机器数已经超过了15000+台,单集群的最大机器数也已经到了2000+台。(2022年8月)

如图所示,蓝色部分描述了Kafka在数据平台定位为流存储层。主要的职责是做数据的缓存和分发,它会将收集到的日志分发到不同的数据系统里,这些日志来源于系统日志、客户端日志以及业务数据库。下游的数据消费系统包括通过ODS入仓提供离线计算使用、直接供实时计算使用、通过DataLink同步到日志中心,以及做OLAP分析使用。
QQ音乐的日志采集架构
使用ELK 构建日志处理平台,提供无侵入、集中式的远程日志采集和检索系统

Filebeat 作为日志采集和传送器。Filebeat监视服务日志文件并将日志数据发送到Kafka。
Kafka 在Filebeat和Logstash之间做解耦。
Logstash 解析多种日志格式并发送给下游。
ElasticSearch 存储Logstash处理后的数据,并建立索引以便快速检索。
Kibana 是一个基于ElasticSearch查看日志的系统,可以使用查询语法来搜索日志,在查询时制定时间和日期范围或使用正则表达式来查找匹配的字符串。
其他公司
微信、拼多多、哔哩哔哩、微众银行
Confluent 公司
2014年11月,几个曾在领英为Kafka工作的工程师,创建了名为 Confluent 的新公司,着眼于Kafka,2021年6月,Confluent 在美国纳斯达克上市,市值超过100亿美元。
理论上Confluent 公司提供的Kafka服务应该是最具权威和成熟的。
README
修改记录:
银法王 2021-08-16 第一次修订
银法王 2022-09-06
银法王 2023-02-02 补充幂等性、事务、流计算相关内容
参考:
Kafka-工作流程,文件存储机制,索引机制,如何通过offset找到对应的消息
http://www.kafka.cc/archives/26.html
《深入理解Kafka核心设计与实践原理》朱忠华 2019年