本节概览:spring提供了两种整合kafka的方式:spring-integration-kafka 和  spring-kafka,本节只对spring-kafka进行介绍。

1 应用实例

1.1 maven配置

spring-kafka的版本为2.1.1.RELEASE

对应spring的版本是5.0.3.RELEASE

spring-kafka和kafak-clients版本映射关系,以及spring-integration和kafka-client的映射关系如下图

11

1.2 生产者实例

13

1.2.1 配置文件

定义一个producer.xml文件

如果需要支持事务,则需要进行配置一个transactionIdPrefix的参数。

2、application.xml

1.2.2 定义一个ProducerClient

1.2.3 测试

1.3 单个消费者实例

分为两种类型:单个消费实例和多个消费者实例。

1

1.3.1 配置文件

定义一个listener.xml文件

如果需要配置事务,实现Consume-transform-Prodcue,则如下配置一个kafkaTransactionMaanager

如果需要配置一个正则表达式的主题,指定一个Pattern对象

1.3.2 定义一个messagListener

在上面配置文件中构建ContainerProperties时的属性messaeListener指定。

1.3.3 测试

此时可以向kafka发送消息(发送消息可以直接使用kafka自带的脚本进行),然后就可以看到日志了:

08:38:58.598 [messageListenerContainer-C-1] DEBUG org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer – Received: 2 records
offset= 51, key= null, value= message-1
offset= 52, key= null, value= message-2

1.4 多个消费者实例

通过定义一个ConcurrentMessageListenerContainer来替换上面的KafkaMessageListenerContainer

如上面listener.xml配置中

替换为如下,配置效果就是相当于部署了2个消费者。

2 实现生产者

为了执行生产者发送消息,spring-kafka提供了一个KafkaTemplate,让我们只关心 send 消息不需要再关注创建producer和关闭producer,还提供了事务的相关操作(Kafka从0.11.0.0版本以上)。

2.1发送消息

提供了如下接口,可以支持发送消息

除了上面send操作还提供了如下操作,支持如下接口发送消息

   

关于ProducerCallback的代码如下:

2.2 事务相关操作

提供了如下两个支持事务的操作:

实现消费者

如果没有spring-kafak,我们需要自己创建消费者、还需要有个定时任务保证消费者一直运行获取消息、处理消息之后提交偏移量以及事务一致性(Kafka从0.11.0.0版本以上)等操作。spring-kafka封装了这些操作,只需要使用者关注消费消息的逻辑。一个流程图如下:

Snip20180504_61

3.1 创建消费者

spring-kafka可以创建单个消费者,而且还支持创建多个消费者:

1、创建单个消费者

KafkaMessageListenerContainer 创建一个消费者,支持创建两种类型消费者:消费者群组类型和独立消费者类型。

2、创建多个消费者

ConcurrentMessageListenerContainer可以创建多个消费者,可以通过并发数属性来设置创建多少个消费者,ConcurrentMessageListenerContainer作用其实和部署多个消费者服务是一样的效果。支持创建两种类型消费者:消费者群组类型和独立消费者类。

(1)关于ConcurrentMessageListenerContainer的并发数说明

这个并发数是指一个app实例创建多少个消费者。如果我们部署多个服务,比如10台服务,而分区个数时20,此时可以设置并发数是2,如果分区个数小于10,此时设置并发是1就可以了。

注意:消费消息时使用多线程消费消息, 这种多线程处理可以在自己实现消费消息逻辑中自己实现。=

3.2 定时任务

KafkaMessageListenerContainer实现了Lifecycle接口的start方法,所以在上下文初始化时会调用这个对象的start方法。

KafkaMessageListenerContainer通过doStart方法定义了处理消息逻辑,所以启动任务的入口可以看成是:

3.3 处理消息模板

KafkaMessageListnerContailer#ListenerConsumer#run中实现了处理一个消息泛型逻辑:

3.4 用户只关注处理消息逻辑

在使用spring-kafka,我们只需要关注处理消息逻辑,通过继承实现MessageListener来实现处理消息的逻辑,主要包含四种类型Listener:

  • SIMPLE,就是在处理消息时,不需要考虑提交偏移量和使用Consumer对象。

  • ACKNOWLEDGING, 当需要手动提交时时,而不是自动提交或者spring-kafka自己实现提交的方式时,需要如下接口中acknowledment的acknowlegge()方法来提交偏移量

  • CONSUMER_AWARE,类似Spring IOC 的ApplicationContextAware功能,如果我们在消费消息时,需要用到consumer对象,则需要使用这个类型。

  • ACKNOWLEDGING_CONSUMER_AWARE,同时支持ACKNOWLEDGING和CONSUMER_AWARE两种类型。

如何确定ListenerType类型?可以根据继承不同类型MeassgeListener可以确定ListenerType:

如何使用ListenerType?在KafkaMessageLisnerContailer中根据不同listenerType来处调用不同接口

3.5 提交策略

spring-kafka负责提交策略和自己手动提交,可以参考AckMode

具体可以分为两类:

1、spring-kafak负责提交

(1)RECORD,   每处理一个消息,就提交一次。和MANUAL_IMMEDIATE类似,只是这里是通过spring-kafka自己操作,而MsManualImmediateAck需要我们自己在代码里面调用Acknowledgment#acknowledge

(2)BATCH,      将上一次poll得到消息进行提交。和MANUAL类似,BATH是通过spring-kafka操作,而MANUAL模式是用户自己通过调用Acknowledgment#acknowledge来执行。

(3)TIME,    达到段时间间隔,就提交

(4)COUNT,    达到次数,就提交

(5)COUNT_TIME, 达到次数和时间间隔两个条件,就提交

2、用户自己手动提交,通过在代码里面调用Acknowledgment#acknowledge()进行提交。

(1)MANUAL:调用Acknowledgment#acknowledge可以先放置到offts中,到时候在processCommits中进行处理。

(2) MANUAL_IMMEDIATE  处理完消息,调用Acknowledgment#acknowledge执行立刻提交。

在这种模式下,自己写提交偏移量逻辑,spring-kafka不负责提交而且也不是kafka的自动提交模式。此时可以继承ListenerType为ACKNOWLEDGING或者ACKNOWLEDGING_CONSUMER_AWARE类型的MessageListener,然后使用如下接口参数中acknowledgment#ackknowlege()方法来提交。

举例如下:

3.6 事务性

事务性介绍可以参考 :

Kafka生产者事务和幂等

3.6.1 KafkaTransactionManager

1、doBegine执行初始化事务和开启事务:

在doBegin中通过如下 ProducerFactoryUtils#getTransactionalResourceHolder方法创建KafkaResourceHolder(当成数据库的Connectiion),并执行初始化事务(initTransactions())和开启事务(begineTransactions()),如下:

创建生成者,并初始化事务

2、doCommit提交事务

通过KafkaResourceHolder的commit来提交事务。

3.6.2 实现提交偏移量事务操作

通过TransactionTemplate#excute来执行消费消息的逻辑

在上面的action.doIntransaction中在处理完成消息并调用这个操作发送偏移量

3.6.3 事务提交和提交策略比较

选择事务提交时,如果此时再设置成其他的提交策略AckMode中值:比如说依赖次数或者时间,那么这些设置也是不生效。

3.7 总结

在使用spring-kafka的消费者功能时,可以考虑ContainerProperties中常用属性有:

(1)设置主题

topics:主题

topicPattern:主题模式

topicPartions:包含topic/partions/initial offesets。

(2)提交策略的相关配置

目前分为三种:处理一条就提交、时间策略、次数策略、自己手动提交策略(spring-kafka不再负责提交)

ackMode:指定提交模式

ackCount:采用次数策略提交模式,需要指定个数阈值。

ackTime:  采用时间策略提交时,需要指定时间阈值

syncCommits 同步或异步提交

(3)事务属性的配置

transactionManager 指定一个KafkaTransactionManager对象,它封装了spring 事务管理器,实现kafka的事务功能。

(4)消费消息的业务逻辑

messageListener 需要实现MessageListener接口,实现处理消息具体业务逻辑

(5)消费者信息

groupId 消息分组

clientId  消费者名字的前缀

参考文献

官网:http://projects.spring.io/spring-kafka/

官网文档: https://docs.spring.io/spring-kafka/reference/htmlsingle/

分类&标签