Kafka梳理

消息队列

一般什么时候用消息队列

消息队列一般主要解决应用耦合、异步消息、流量削锋等问题。

消息队列的通信方式

消息队列(Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式。

消息中间件选型

特性 ActiveMQ RabbitMQ RocketMQ Kafka redis/db
单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低不止一个数量级 万级,吞吐量比RocketMQ和Kafka要低不止一个数量级 十万级,RocketMQ也是可以支撑高吞吐的一种MQ 百万级别,Kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
优势 非常成熟,功能强大,在业内大量公司和项目中都有应用 erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的Topic、支持复杂的业务场景,可以基于源码进行定制开发 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
可用性 高,基于主从架构实现可用性 高,基于主从架构实现可用性 非常高,分布式架构 非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
功能支持 MQ领域的功能及其完备 基于erlang开发,所以并发性能极强,性能极好,延时低 MQ功能较为完备,分布式扩展性好 功能较为简单,主要支持加单MQ功能
支持协议 STOMP AMQP,XMPP, SMTP,STOMP 基于TCP/IP自定义的协议 基于TCP/IP自定义的协议
消息可靠性 有较低的概率丢失数据 - 经过参数优化配置,可以做到零丢失 经过参数配置,消息可以做到零丢失
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
劣势 偶尔有较低概率丢失消息,社区活跃度不高 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 有可能进行消息的重复消费
应用 主要用于解耦和异步,较少用在大规模吞吐的场景中 都有使用 用于大规模吞吐、复杂业务中 在大数据的实时计算和日志采集中被大规模使用,是业界的标准
开发语言 java Erlang java Scala/Java
Topic数量对吞吐量的影响 - - Topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的Topic Topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,Kafka尽量保证Topic数量不要过多。如果支撑大规模Topic需要增加更多的机器

Kafka

Kafka简介

Kafka都有哪些特性?

分布式 高性能 持久性和扩展性
多分区 高吞吐 数据可持久化
多副本 低延迟 容错性
多订阅者 高并发 支持水平扩展
基于ZooKeeper调度 时间复杂度O(1) 消息自动平衡

Kafka的使用场景

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等;
  • 消息系统:解耦和生产者和消费者、缓存消息等;
  • 实时计算:比如spark streaming和 Flink等流式处理;
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘;
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

Kafka为什么速度快?

参考链接

  1. 顺序读写磁盘:一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
  2. Page Cache:
  3. 零拷贝
  4. 分区分段+索引
  5. 批量读写
  6. 批量压缩

ZooKeeper是如何管理Kafka的

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有topic的分区副本分配和leader选举等工作。 Controller的管理工作都是依赖于 Zookeeper的。

架构设计

设计思想

Kafka是基于发布者-订阅者模式的消息队列组件。

消费者组

Kafka按消费者组来消费消息,每个消息只能被Consumer Group中的一个Consumer消费,不同Consumer可以消费同一消息。

消息状态

在Kafka中,消息是否被消费的状态保存在Consumer中,Broker不会关心消息是否被消费或被谁消费,Consumer会记录一个offset值(指向partition中下一条将要被消费的消息位置),如果offset被错误设置可能导致同一条消息被多次消费或者消息丢失。

如此可能会导致的问题:重复消费,消息丢失。

消息持久化

Kafka会把消息持久化到本地文件系统中,并且具有极高的性能。

批量发送

Kafka支持以消息集合为单位进行批量发送,以提高效率。

Push & Pull

Kafka采用的是拉模式。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout

分区机制

Kafka的Broker端支持消息分区,Producer可以决定把消息发到哪个Partition,在一个Partition中消息的顺序就是Producer发送消息的顺序,一个Topic中的Partition数是可配置的,Partition是Kafka高吞吐量的重要保证。

Kafka工作流程简图

系统结构

Kafka拓扑结构

Topic & Partition & Offset

Topic在逻辑上可以被认为是一个Queue,Kafka中每条消息都必须指定一个Topic,一个Topic中的消息可以分布在集群中的多个Broker中,Consumer根据订阅的Topic到对应的Broker上去拉取消息。为了提升整个集群的吞吐量,物理上一个Topic可以分成多个Partition,每个Partition在磁盘上对应一个文件夹,该文件夹下存放了这个Partition的所有消息文件和索引文件。假设有topic1和topic2两个Topic,且分别有13个和19个分区,则整个集群会生成32个文件夹。

“RECORD”部分就是Kafka的消息格式,一条完整的消息包含RECORD、offset以及message size。其中offset用来标识它在Partition中的偏移量,这个offset是逻辑值,而非实际物理偏移值,message size表示消息的大小。

这里要注意,因为Kafka读取消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。

同时,因为offet由Consumer控制,所以Kafka Broker是无状态的,它不需要标记消息是否被消费过,也不需要通过Broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,从而保证了Kafka的高吞吐率

消息文件格式

消息发送

Producer发送消息到Broker时,会根据Paritition机制选择将消息存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同的Partition中,极大的提高了吞吐率。

所谓的Partition机制也就是Poducer消息partitioning策略,具体有以下几种策略:

轮询策略

轮询策略是Kafka Java客户端生产者的默认策略,轮询策略的负载均衡表现非常优秀,总能保证消息最大限度地被平均分配到所有分区上,默认情况下它是最合理的分区策略。

随机策略

随机策略默认从Partition列表中随机选择一个。

按消息键保序策略

Kafka允许为每条消息定义消息键,简称为Key(Key可以是一个有明确业务含义的字符串:客户代码、部门编号、业务ID、用来表征消息的元数据等)。一旦消息被定义了Key,可以保证同一个Key的所有消息都进入到相同的分区里,由于每个分区下的消息处理都是顺序的,所以这个策略被称为按消息键保序策略。

消息消费

Producer在生产消息的时候就涉及消息发送策略,而涉及到消息消费策略是发生在:1. 同一个Consumer Group内新增消费者;2. 消费者离开当前所属的Group; 3. 订阅的主题新增Partition。

Range策略

对于一个topic,对分区进行排序,然后range平均分配给Consumer Group,余的几个partitions交给前面几个Consumer Group消费。

这样带来的问题是,如果topic很多,总是前几个Consumer Group 额外消费。

RoundRobin策略

把所有topic的partition排序之后轮询分配给Consumer Group,这样带来的好处是负载均衡。但这种策略要保证两个前提:1. 一个消费者组消费的topic得是相同的;2. 每个topic的实例个数也是相同的

高可用

Kafka的高可用是说,在一个或多个Broker宕机后,其他Broker及所有Partition都能继续提供服务,且存储的消息不丢失。

消息备份机制

消息备份

Kafka的一个高可用特性体现在允许Partition存在多个副本(Replica)。

生产者直接将消息发送给Partition的leader,然后follower周期性地向leader请求同步数据。

Kafka在分配leader和follower的时候,要让leadr平均分配在不同的broker上,同一个partition的leader和follower不能在同一个broker上。

Kafka消息备份

ISR

ISR(In-Sync Replicas)指的是一个Partition中与Leader“保持同步”的Replica列表(实际存储的是副本所在Broker的BrokerId),这里的保持同步不是指与Leader数据保持完全一致,只需在replica.lag.time.max.ms(默认值为500)时间内与Leader保持有效连接。

Acks

生产者发送消息中包含acks字段,该字段代表Leader应答生产者前Leader收到的应答数。

  1. ack = 0:生产者无需等待服务端的任何确认,消息被添加到生产者套接字缓冲区后就视为已发送,因此acks=0不能保证服务端已收到消息,使用场景较少;
  2. ack = 1:Leader将消息写入本地日志后无需等待Follower的消息确认就做出应答。如果Leader在应答消息后立即宕机且其他Follower均未完成消息的复制,则该条消息将丢失;
  3. ack = all:Leader将等待ISR中的所有副本确认后再做出应答,因此只要ISR中任何一个副本还存活着,这条应答过的消息就不会丢失。acks=all是可用性最高的选择,但等待Follower应答引入了额外的响应时间。Leader需要等待ISR中所有副本做出应答,此时响应时间取决于ISR中最慢的那台机器。

LEO & HW

Kafka Replicas 中有两个重要的概念。

LEO(log end offset) :即日志末端偏移,指向了副本日志中下一条消息的位移值(即下一条消息的写入位置)

**HW(high watermark)**,即已同步消息标识,因其类似于木桶效应中短板决定水位高度,故取名高水位线

  1. 消费者仅可消费各分区Leader高水位线以下的消息;
  2. Leader的HW值由ISR中的所有备份的LEO最小值决定(Follower在发送FetchRequest时会在PartitionFetchInfo中会携带Follower的LEO)
  3. 对于任何一个副本对象而言其HW值不会大于LEO值

Kafka-HW&LEO

数据一致性的保障

follower 故障:follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
leader 故障:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

故障恢复

Controller

Kafka需要在集群所有Broker中选出一个Controller负责各Partition的Leader选举以及Replica的重新分配

Kafka使用ZooKeeper存储Broker、Topic等状态数据,Kafka集群中的Controller和Broker会在ZooKeeper指定节点上注册Watcher(事件监听器),以便在特定事件触发时,由ZooKeeper将事件通知到对应Broker。

高性能

批量发送消息

Partition

Kafka的消息是一个一个的键值对,键可以设置为默认的null。键有两个用途,可以作为消息的附加信息,也可以用来决定该消息被写入到哪个Partition。

Kafka通过将Topic划分成多个Partition,Producer将消息分发到多个本地Partition的消息队列中,每个Partition消息队列中的消息会写入到不同的Leader节点。消息经过路由策略,被分发到不同的Partition对应的本地队列,然后再批量发送到Partition对应的Leader节点。

Kafka批量发送消息

消息路由

Round Robin:Producer将消息均衡地分配到各Partition本地队列上,是最常用的分区策略。

散列:Kafka对消息的key进行散列,根据散列值将消息路由到特定的Partition上,键相同的消息总是被路由到相同的Partition上。

自定义分区策略:Kafka支持自定义分区策略,可以将某一系列的消息映射到相同的Partition。

消息持久化

  1. Kafka采用顺序IO读写磁盘;
  2. 通过索引文件提高对磁盘的查询效率;

零拷贝

数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝,这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,没有cpu数据拷贝,因此大大提高了性能。

Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。

Kafka的数据传输通过TransportLayer来完成,其子类PlaintextTransportLayer通过Java NIO的FileChannel的transferTo()和transferFrom()方法实现零拷贝。transferTo()和transferFrom()并不保证一定能使用零拷贝,实际上是否能使用零拷贝与操作系统相关,如果操作系统提供sendfile这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

cpu四次拷贝

零拷贝

一些常见问题的解决方案

数据丢失

producer丢消息

  1. 在发送过程中发生网络抖动,导致消息没有发送到broker;
  2. 消息本身不合规(消息太大)导致broker拒收。

解决方案:发送端增加重试机制。

broker丢消息

  1. follower没来得及完全同步leader的数据,leader下线了,重新选举的follower信息不完全。

解决方案:

  • 通过设置,保证每个partition 的副本至少要2个以上;
  • 通过设置,保证leader至少与一个follower 正确同步数据;
  • ack = all,所有副本同步完之后,才认为是写入成功。

consumer丢消息

  1. consumer自动提交了offset,让broker认为该offset已经被消费,但在处理消息时机器宕机了,导致这条消息没有真正被处理。

解决方案:关闭自动更新offset,等到数据被处理后再手动跟新offset。

重复消费

重复消费的原因在于:已经消费了数据,但是offset没来得及提交。

  1. 强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
  2. 设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
  3. 消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会reblance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
  4. 当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
  5. 当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
  6. 并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

解决方案:

  1. 提高消费者处理速度,或增加timeout值;
  2. 引入消息去重机制:生产端生产消息时,加入唯一id,消费端保存最近消费记录,消费的时候通过redis查询去重再消费;

数据积压

数据积压导致来不及消费怎么办?

解决方案:

  1. 先修复 consumer的问题,确保其恢复消费速度,然后将现有 consumer都停掉。
  2. 新建一个 topic, partition是原来的10倍,临时建立好原先10倍的 queue数量
  3. 然后写一个临时的分发数据的 consumer程序,这个程序部署上去消费积压的数据,消费之 后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的 queue。
  4. 接着临时征用10倍的机器来部署 consumer,每一批 consumer消费一个临时 queue的数据。这种做法相当于是临时将 queue资源和 consumer资源扩大10倍,以正常的10倍速度来消费数据。
  5. 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer机器来消费消息。