孙宇的技术专栏 大数据/机器学习

Kafka基础原理

2021-09-14

阅读:


配置项

zookeeper.connect

kafka 使用 zookeeper 来保存生产者和topic元数据,消费者元数据、分区偏移量

通过 zookeeper.connect 可以指定 kafka 使用的 zookeeper 的ip及端口,以及用来存储元数据的地址。格式是:ip:host/path 如果不指定 path,会默认使用根路径。如果指定的 path不存在,在启动 broker 时会创建它。

在实际使用时,最好指定 path。因为 zookeeper 集群可能给不同的服务使用,通过 path 能进行很好的区分。如果有不同的 kafka 集群都连这个 zookeeper,通过 path 区分就不会出现冲突。

log.dirs

kafka 将消息存在本地磁盘上。log.dirs 用来指定消息存放的路径。可以通过逗号分隔,指定多个路径。如果指定了多个路径,broker 会采用【最少使用原则】,将同一分区的内容保存在同一路径下。

broker 会往拥有最少数量分区的路径新增分区,而不是往磁盘空间最小的路径新增分区

默认情况下,kafka会在三种动作下自动创建 topic:

  1. 任意一个客户端向 topic 发送元数据
  2. 一个生产者往 topic 写数据时
  3. 一个消费者从 topic 取数据时

如果要关闭这个默认动作,需要手动显式的去创建 topic,需要把配置文件中 auto.create.topics.enable 设置为 false

num.partitions

num.partitions 指定新创建的 topic 包含多少个分区。

如果topic的自动创建功能(auto.create.topics.enable)是打开的。它会创建 num.partions 数量的分区。该值默认是 1。我们可以增加分区的数量,但不能减少。

比如,num.partions 设置为 3。在后序的使用中,我们可以把该值改成 4,但不能改成 2.

在使用中,是通过分区数进行横向扩展的。当有新的 broker 加入集群时,可以增加分区数量来实际负载均衡。具体一个 topic 需要多少个分区,需要根据实际需要的吞量来进行估算:

  1. 一个分区和般会有一个消费者
  2. 假如消费者 1 秒可以处理 100 条数据。生产者 1 秒存入 1000 条数据
  3. 那么,就需要 10 个消费者。也就是说,要 10 个分区。

log.retention

log.retention 控制 kafka 将消费保存多久。有三个值可以指定,即:

log.retention.hour, log.retention.minutes, log.retention.ms

默认是 168 小时。这三个参数的作用是一样的。但如果这三个参数都设置了值,kafka 会优先使用最小的那个时间。

除了通过时间来控制消息保存的时间,还可以通过 log.retention.bytes 来实现保存一定数量的数据。该值应用到每个分区上。

比如:有8个分区,该 log.retention.bytes 的值设置为 1G。那么,最多能保存 8G 的数据。如果要扩展,就要通过增加分区的形式来实现。

log.segment

一个消息到达 broker 时,它会被追加到分区当前的文件中。当文件大小到达 log.segment.bytes 时(该值默认 1G),文件会被关闭,然后打开一个新的文件。 被关闭的文件就开始等待过期,过期的动作受到 log.retention 的影响。即:过期时间和过期大小。

log.segment.bytes 这个值设置的越小,关闭文件、打开文件操作会越频繁,这会损耗性能影响效率。

如果一个 topic一天接收 100MB 的数据,log.segment.bytes 的值是 1G。那么,需要 10 天才会关闭这个文件。

过期时间设置的是 7 天,【文件在关闭前是不会过期的,在关闭后才开始计时关闭】。所以,这里的消息要 17 天才会过期。【文件中最后一条消息过期后,文件才真正过期,这时候文件会被删除】。这里的值,可以用来评估需要多大的磁盘空间。

除了通过 log.segment.bytes 的大小值来控制文件的关闭,还可以通过时间来周期性的关闭。即:log.segment.ms

这两个条件,哪个先满足,就执行文件关闭。所以不会出现互斥的情况。默认情况下是只使用 log.segment.bytes 来进行。

message.max.bytes

message.max.bytes 限制了单个消息的大小。如果超过这个值,kafka 会拒收,同时返回错误信息。默认值是 1MB,该大小是指压缩后的大小,实际的大小可能远大于该值

该值越大,处理网络连接、磁盘写入等就会消耗越多的性能。所以,kafka 不太建议用来做很大单条数据的处理,推荐用来做小而快的消息处理。

与该值匹配的还有另外一个值:fetch.message.max.bytes ,它是消费端能获取的消息的最大值。如果该值设置小了,就会导致消费端消费不及时。

生产调优

MaxGCPauseMillis 每次垃圾回收暂停时间。默认是 200ms。

InitiaingHeapOccupancyPercent 回收前可使用堆的百分比。默认 45%。即:在堆内存使用达到 45% 前不会进行垃圾回收。

Kafka 对堆内存使用率非常高,容易产生垃圾对象。所以可以把这两个值设置的稍小一点。尽早进行垃圾回收。如:200ms, 35%

broker 写入数据

kafka 使用 zookeeper 来维护集群各成员的信息。每个 broker 都有一个唯一的标识符,这个标识可以在配置文件中指定,也可以自动生成。启动 broker 时,它通过创建临时节点把自己的ID注册到 Zookeeper 上。Kafka 各组件监听Zookeeper 的 /brokers/ids(在配置文件中配置的路径)。当有 broker 加入集群或退出集群时,各组件就会获得通知。如果有 broker 的ID重复,启动时会报错,因为在 Zookeeper 上创建同名节点是不会成功的。

消息写入时,我们先创建 ProducerRecord 对象。它包含了目标topic、分区(可不配置)、键(可不配置)、要发送的内容。数据被发送给【分区器】。如果配置了分区,分区器则不会任何处理。有了分区,生产者就知道往哪个 topic 和分区写入。接着,数据被添加到一个消息批次里,这个批次里的消息会被发到同一个 topic 和分区里。会有专门的线程把这些消息发送到对应的 broker 里。

服务器收到这些消息时会返回一个响应。如果成功写入 Kafka,会返回一个 RecordMetaData 对象,它包含 topic 和分区信息,以及数据在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误后会重试 N 次,重试后还是失败,则会返回错误信息。

生产者

要往 Kafka 写入消息,需要创建 Kafka 对象,并设置一些属性。三个必填的参数分别是:

  1. bootstrap.servers。指定 broker 的清单,格式为:host:port,多个 broker 之间用逗号分隔。生产者会从一个 broker 里获取到其它所有 broker 的信息。但为了防止提供的 broker 挂掉了。所以最好是提供两个 broker 信息。
  2. key.serializer。键的序列化类。因为 broker 希望接收到的消息都是字节数组。所以要进行序列化。默认提供了常见类型的序列化类。如:String, Integer 。
  3. value.serializer。值的序列化器。如果 key 和 value 同类型,可以使用相同的序列化类。如果不同,则要分别配置不同的。

发送消息时,可以用三种方式发送:

  1. 同步发送。发送后进行等待结果返回。
  2. 异步发送。发送时指定一个回调函数。无须等待。
  3. 发送并忘记。发送后不做其它处理。这样可能会丢失数据。

分区

Kafka 的基本存储单元是分区,分区无法在多个 broker 间再细分,也无法在同一个 broker 的多个磁盘上细分。所以,分区的大小受到单个挂载点可用空间的限制。

Kafka 的消息是一个个键值对。在 ProducerRecord 对象里,键的值可以为空。但最好是配置上。它可以决定这个数据被存在哪个分区。拥有相同键的消息会被发送到同一个分区。

如果键的值为 null,且使用了默认的分区器。Kafka 会将记录随机发送到各个可用的分区上。

如果键值不为 null,且使用默认分区器。Kafka会对键进行散列(使用自己的散列算法)。散列算法会把所有的分区都用上,不管这个分区是否有问题。那么,如果散列到的分区不可用。会出问题。因为:相同键值的信息会发送到同一个分区

除了使用默认的分区器外,还可以自定义实现分区器。需要自己实现 Partitioner接口。

参数配置

acks

指定了需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 如果 acks = 0,表示不做判断,这种处理的吞吐量最大。 如果 acks = 1,表示只要 master 接收到了就认为成功了。 如果 acks = all,表示所有参与复制的节点全部收到消息才会认为成功,这时候才会返回客户端成功的标识。

buffer.memory

生产者内存缓冲区的大小,用来缓冲要发送到服务器的消息。如果应用程序send()消息的速度超过发送到服务器的速度,导致配置的缓冲区空间不足。这时候 send() 会被阻塞。

compression.type

默认情况下,消息发送时不会被压缩。该参数可以配置压缩算法,如:snappy、gzip、lz4。 使用压缩算法后,可以降低网络传输开销和存储开销,但会损耗性能。 snappy占用较少的CPU,提供较好的性能和可观的压缩比。gzip 战胜较多的CPU,提供更高的压缩比。

retries

生产者发消息时,如果收到错误信息,会重试。这个参数就配置了重试的次数。每次重试之间会等待 100ms。也可以通过 retry.backoff.ms来改变该值。重试次数达到,如果还没成功,就会返回错误。

batch.size

如果多个消息要被发送到同一个分区,生产者会把它们放在同一批次里一起提交。这个参数指定了一个批次可以使用的内存大小。嫁到字节计算,不是消息个数。如果设置的太大,消息会有延迟。如果太小,又会增加一些额外的开销。

linger.ms

除了通过批次内存大小来控制批量提交的行为。还可以通过时间来控制。当 linger.ms 时间达到时,也会触发提交的动作。

client.id

唯一识别码。各个broker不要重复

max.in.flight.requests.per.connection

指定生产者在收到服务器响应之前可以发送多少个消息。如果设置成很大的值,就可以实现异步的效果,提升吞量,占用更多的内存。如果设置为 1,可以保证消息是按发送的顺序写入的。

max.request.size

发送请求的大小。如果是批量提交,这整个批次的数据大小加起来不能超过该值

consumer 消费数据

消费者和消费者群组

消费者用来从 topic 订阅、读取并消费数据。但实际中一个消费者往往跟不上生产者的速度,所以需要用一组消费者消费同一个 topic 的消息。这就是消费者群组。

Kafka的消费者属于一个消费者群组。一个群组里的消费者访问相同 topic 的消息。每个消费者接收该 topic 一部分分区的消息。

如果消费者的数量等于分区的数量。每个消费者会被分配一个分区。

如果消费者的数量大于分区的数量。有的消费者会闲置。

如果我们创建多个群组消费同一 topic。每个群组都会接收到全量的消息。那么,我们就可以在业务上将一串复杂的逻辑处理拆分成多个小的处理。交给多个群组来并行处理。当然,前提是这些逻辑处理是没有顺序依赖的。

当群组里添加或减少消费者时,群组会进行再均衡,重新分配分区。在重新分配期间,所有消费者是不可用的。这会影响效率。

消费者通过向被指派为群组协调器的 broker 发送心跳来维持状态。如果超出时间未发送心跳,协调器会认为该消费者已经挂掉,这时候会进行再均衡。

创建消费者

通过创建 KafkaConsumer 对象来创建消费者。它和生产者非常类似,也有几个必填参数:

  1. bootstrap.servers。指定 broker 的清单,格式为:host:port,多个 broker 之间用逗号分隔。生产者会从一个 broker 里获取到其它所有 broker 的信息。但为了防止提供的 broker 挂掉了。所以最好是提供两个 broker 信息。
  2. key.serializer。键的序列化类。因为 broker 希望接收到的消息都是字节数组。所以要进行序列化。默认提供了常见类型的序列化类。如:String, Integer 。
  3. value.serializer。值的序列化器。如果 key 和 value 同类型,可以使用相同的序列化类。如果不同,则要分别配置不同的。
  4. group.id。用来表明消费者是属于哪个消费群组。

订阅topic

创建完消费者后,可以订阅某个或多个 topic 了。也可以在订阅时传入一个正则表达式。如果有人创建了新的 topic,且符合该正则,那会立即触发一次再均衡。

轮询

消息轮询是消费者处理的核心。消费者订阅主题后,轮询就会处理所有的细节。如:群组协调、再均衡、发送心跳、获取数据。如:

try{
	// 不停轮询
	while(true){
		// 不停的从 kafka 取数据,否则kafka会认为该消费者挂掉,然后从群组去除它
		ConsumerRecords<String, String> records = consumer.poll(100);
		// 遍历取到的值,值里面包括了 topic、分区、偏移量、键值对
		for(ConsumerRecords<String, String> record :  records){
			// TODO: 获取各值并进行逻辑处理
			String loopTopic = record.topic();
			String loopPartition = record.partition();
			String loopOffset = record.offset();
			String loopKey = record.key();
			String loopValue = record.value();
		}
	}
}finally{
	// 关闭消费者.此时会触发再均衡
	consumer.close();
}

提交和偏移量

在轮询中,每次调用 poll()方法,它返回的总是被写入 kafka 但还没被消费者读取过的消息。那么,我们就需要知道哪些消息是被消费过的。消费者消费一条消息后,都需要向 kafka 更新一下分区的偏移量信息,表示数据已经被消费到这里了。下次就返回后面未消费过的数据给消费者。这个过程叫提交

kafka 会往一个叫 consumer_offset的主题发送消息。消息里包括了每个分区的偏移量。如果消费者一直在 poll(),它会直接从分区一直取数据。不会读取该主题的内容。但如果发生了再均衡(有消费者挂掉或有新的消费者加入)。消费者可能会被分配到新的分区,它就需要从该主题中获取偏移量,否则它不知道从哪个位置开始读取消息。如果消费者未及时提交偏移量或者提交失败,这时候就会有问题了。

自动提交

最简单的方案就是让消费端自动提交。这需要把配置文件中的 enable.auto.commit设置为 true。这时,每过 5秒,消费者会自动把 poll()方法接收到的最大偏移量提交上去。这个 5s是受配置文件中 auto.commit.interval.ms控制的,默认值是 5秒。在轮询时,每次都会先检查是否需要提交了。如果达到提交要求,会先把上次轮询的最大偏移量先提交,然后再 poll()。

自动提交的问题是,它只能定期提交。在两次提交之间,如果发生故障,导致一部分消息消费了但还没来得及提交。再均衡后,就会重复消费一部分数据。这种场景不能完全避免,只能通过减少提交间隔来缓解,另外需要业务上做重复性校验

提交当前偏移量

为了避免定期提交带来的问题,也可以手动进行提交。在每次 poll() 及逻辑处理完毕后,手动提交一次:

try{
	consumer.commitSync();
}catch(CommitFailedException e){
	log.error("offset commit error: ", e);
}

使用手动提交时,要将配置文件中的 enable.auto.commit设置为 false。手动提交会在失败时自动重复尝试,直到成功。如果发生异常,我们可以进行异常捕获,记录日志

异步提交

手动提交有个劣势就是:在提交时,程序会阻塞。这会减少程序的吞吐量。这时候可以使用异步提交:

	consumer.commitAsync();

异步提交的问题在于,无法保证顺序。特别是在并发时,一个请求发送了 2000 的偏移量,服务端由于网络原因未收到,但另外一个请求发送了 3000 。这时候 2000 的请求成功了,就会覆盖 3000。如果再均衡,就会重复消费了。

组合提交

为了确保能够成功提交。可以采用如下方式:

try{
	// TODO: 逻辑处理
	consumer.commitAsync();
}catch(Exception e){
	log.error("Offset Commit Error!", e);
}finally{
	try{
		consumer.commitSync();
	}finally{
		consumer.close();
	}
}

提交特定偏移量

通常,提交偏移量的频率和处理消息批次的频率是一样的。但如果一次 poll() 获取了大量的数据,为了避免因再均衡引起重复处理的问题,需要在批次中间处理的时候就提交偏移量。还有一种情况:一个消费者要处理不同分区的数据,提交的时候要分别提交:

private Map<TopicPartition, OffsetAndMetadata> nowOffsets = new HashMap<>();
int count = 0;

while(true){
	ConsumerRecords<String, String> records = consumer.poll(100);
	
	// 不同分区的偏移量分别保存
	for(ConsumerRecord<String, String> record : records){
		nowOffsets.put(new TopicPartition(record,topic(), record.partition()), 
			new OffsetAndMetadata(record.offset() + 1, "no metadata")
		);
		
		// 每处理 1000 条消息,提交一次
		if(count % 1000 == 0){
			consumer.commitAsync(nowOffsets, null);
		}
		count++;
	}
}

再均衡监听

消费者在退出和进行分区再均衡前,会做一些清理工作。所以,各个消费者一定要及时提交最后一个处理记录的偏移量。

所以,需要各个消费者监听分区动作。及时提交最后处理记录的偏移量。这里,需要我们实现 ConsumerRebalanceListener接口,在轮询中订阅该主题。并填充相应的逻辑。

private Map<TopicPartition, OffsetAndMetadata> nowOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener{
	public void onPartitionsAssigned(Collection<TopicPartition> partitions){}
	
	public void onPartitionsRevoked(Collection<TopicPartition> partitions){
		consumer.commitSync(nowOffsets);
	}
}
try{
	consumer.subscribe(topics, new HandleRebalance());
	
	while(true){
		ConsumerRecords<String, String> records = consumer.poll(100);
		
		// 不同分区的偏移量分别保存
		for(ConsumerRecord<String, String> record : records){
			nowOffsets.put(new TopicPartition(record,topic(), record.partition()), 
				new OffsetAndMetadata(record.offset() + 1, "no metadata")
			);
			
			// 每处理 1000 条消息,提交一次
			if(count % 1000 == 0){
				consumer.commitAsync(nowOffsets, null);
			}
			count++;
		}
	}
}catch(Exception e){
	log.error("Offset Commit Error!", e);
}finally{
	try{
		consumer.commitSync(nowOffsets);
	}finally{
		consumer.close();
	}
}

从特定偏移量读取


consumer.seek(partition, xxx); ```

退出消费

如果想要让某消费者退出轮询。可以在外部再执行一个程序,调用 consumer.wakeup()

参数配置

fetch.min.bytes

消费者从服务器获取记录的最小字节数。broker 收到消费者的数据请求时,如果可用数据量小于配置的值,它会等到有足够的可用数据再返回。这会降低工作负载。

fetch.max.wait.ms

配置 broker 的等待时间。如果broker等一定的时间后,还是没有足够的数据。则会直接将当前可用的数据返回给客户端。

max.partition.fetch.bytes

服务器从每个分区里返回给消费者的最大字节数。默认值是1MB。也就是说: poll() 方法返回的数据总量不会超过该配置的值。

session.timeout.ms

指定消费者在被认为死亡前可以与服务器断开连接的最长时间。默认是 3S。如果在该时间内未向群组协调器发送心跳,就会被认为死亡,这时候会进行再均衡。heartbeat.interval.ms表示多久向协调器发送一次心跳。所以,该值要小于 timeout 的值。

auto.offset.reset

指定消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该做何处理。默认值是 latest表示从最新的记录开始读取。earliest表示从起始位置读取。

enable.auto.commit

是否自动提交偏移量,通常把它设置为 false,然后在程序中自己控制提交。

partition.assignment.strategy

将分区分配给消费者时采用的策略。有两个选项:

  1. Range。将连续的分区分配给消费者。如果有两个消费者,C1,C2同时订阅两个主题T1,T2。每个主题都有 3 个分区。这时候分区分配可能是:C1 分配 T1 和 T2的1,2分区。C2分配到T1,T2的3分区。因为它会将连接的分区分给同一个消费者。如果分区个数是奇数,就会造成这种不一致。
  2. RoundRobin。把所有的分区逐个分配给消费者。同上场景,分配情况可能是:C1分到T1的1分区,C2分到2分区,C1再分到3分区。这样逐个分配。

数据复制、选举

复制功能是 Kafka 可用性的核心。它可以在个别节点失效时仍能保证集群的可用性和持久性。

Kafka 使用 topic 来组织数据。每个 topic 被分为若干个分区,每个分区有 N 个副本(可配置)。这些副本被保存在各个 broker 上,每个 broker 可以保存成百上千个不同 topic 和分区的副本。

副本有两种:master 和 slaver。为了保证数据的一致性,所有生产者和消费者的请求都会经过 master 副本。除 master 副本之外的,都是 slaver ,它们不响应来自客户端的请求,它们唯一的任务就是接收来自 master 的复制消息,保持和 master 的一致。如果 master 崩溃,会从 slaver 中选取一个作为新的 master。

master 另一个重要的任务就是弄清楚哪些 slaver 是和自己一致的。slaver 为了和 master 保持一致,在有新消息到达 master 时会尝试去复制消息。这种请求和客户端消费数据是一样的,master 将响应返回给 slaver。master 通过请求的偏移量是不是最新的值来判断 slaver 是否和自己一致。

把消息写入多个副本可以使得 Kafka 在崩溃时仍能保证消息的持久性。Kafka 的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,kafka可以保证分区里的消息是有序的。每个分区有多个副本,一个是 master,其它是 slaver。

对于 slaver 来说,怎么判定它和 master 是同步的。有几个标准:

  1. 和 Zookeeper 之间有活跃的对话:在过去的 6s(可配置)内向 zk 发送过心跳
  2. 在过去 10s(可配置)从 master 获取过消息
  3. 在过去 10s从master获取过最新的消息,且几乎零延迟

Similar Posts

上一篇 分布式疑点

评论