本文概览:总结一些kafka常见的问题

1 消费消息的阻塞模式

1.1 问题描述

问题复现:

STEP1:通过poll获取一批消息M1M2…M5,假设成功处理了M1M2M3失败了。

STEP2:  第二次执行poll时,就是M6/M7了。

1.2 问题分析和解决

对于某些业务,上面错误是不准许的,如收益结算业务。必须保证第二次执行poll操作时,从M3开始,即M3M4

此时需要借助seek操作来完成,每次拉取都是从上次失败位置开始拉取消息

2 一个消费者监听多个topic

2.1 问题描述

消费者已经通过seek操作实现了阻塞属性,但是发现topic一直有lag。正常情况下如果存在lag大于0,在阻塞模式下,会一直拉取消息的,查看消费日志没有出现错误日志,给人第一感觉像是消费者线程卡死了。

2.2 问题分析

通过将日志级别设置为false,查看消费者日志,发现消费者一直在运行,只是拉取消息的起始位置是579,但是kafka服务器记录的logsize579offset=565Lag=14,所以此时无法拉取565之后的数据。

1

通过上面日志看到,消费者线程没有卡死,还在运行,只是拉取位置不正确,可能有两种原因:

  • commit没有成功。此时会造成本地位置和服务器的offset位置不同
  • seek没有起作用

再次确认消费者消费消息逻辑:

发现在消费者监听多个topic情况下,假设一次处理:T1-M4、T1-M5、T2-M4、T2-M5、T2-M6、T1-M6。当处理到T2-M5失败了,此时不会commit上面执行过的消息,并且此时会执行seek操作,但是seek只是操作了topic2,没有操作T1。此时重新拉取消息时,就会有问题:T2继续从T2-M5继续拉取,但是对于topic1,是从T1-M6位置开始拉取了,正常情况下对于topic1,应该是从T1-M5位置开始的。

这样可能出现多次,造成T1一直没有执行commit,但是又没有执行seek,直到T1-Mn是最后一条消息,此时就出现了上面日志中问题。

2.2 问题解决

在拉取到消息之后,首先对消息按照topic-partion分组。然后对每一个分组一次进行上面的处理:

3 一直重复拉取消息

1、问题描述

先看一段代码

这样会出现如下问题:

假设拉取100条消息,在第10条失败,抛异常,此时线程池阻塞队列中可能还是有90个线程,此时又重新拉取100条消息,假设此时第一条消息从超时异常,那么此时会再拉取100条消息,如此反复,阻塞队列中会有大量待处理的消息。

2、解决问题

对于

修改为如下,这样可以保证当前阻塞队列中线程都处理完成只有,再去拉取新的消息。

4 监控

1、常常用到监控指标

logSize

lag

offset

注意:只有当一个topic被消费者群组监听之后,才有lag、offset的概念。一个topic-partion对应多个consumer-group,就对应多个lag和offset。

分类&标签