目录
本节概览:spring提供了两种整合kafka的方式:spring-integration-kafka 和 spring-kafka,本节只对spring-kafka进行介绍。
1 应用实例
1.1 maven配置
spring-kafka的版本为2.1.1.RELEASE
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.1.RELEASE</version> </dependency> |
对应spring的版本是5.0.3.RELEASE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.0.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.0.3.RELEASE</version> </dependency> <dependency> <artifactId>spring-test</artifactId> <groupId>org.springframework</groupId> <version>5.0.3.RELEASE</version> <scope>test</scope> </dependency> |
spring-kafka和kafak-clients版本映射关系,以及spring-integration和kafka-client的映射关系如下图
1.2 生产者实例
1.2.1 配置文件
定义一个producer.xml文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <context:component-scan base-package="producer" /> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="acks" value="all"/> <entry key ="retries" value="0"/> <entry key="batch.size" value="16384"/> <entry key="buffer.memory" value="33554432"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties"/> </constructor-arg> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory"/> <constructor-arg name="autoFlush" value="true"/> <property name="defaultTopic" value="myTopic"/> </bean> </beans> |
如果需要支持事务,则需要进行配置一个transactionIdPrefix的参数。
1 2 3 4 5 6 |
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties"/> </constructor-arg> <property name= "transactionIdPrefix" value="test-demo" /> </bean> |
2、application.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <import resource="classpath:spring/producer.xml"/> </beans> |
1.2.2 定义一个ProducerClient
1 2 3 4 5 6 7 8 9 10 |
@Component public class ProducerClient { @Autowired private KafkaTemplate<String,String> kafkaTemplate; public void sendMessage(String topicName,String message){ kafkaTemplate.send(topicName,message); } } |
1.2.3 测试
1 2 3 4 5 6 7 8 9 10 |
public class Excutor { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml"); System.out.printf("启动Producer"); ProducerClient producerClient = (ProducerClient) context.getBean("producerClient"); producerClient.sendMessage("test","data:20180329"); } } |
1.3 单个消费者实例
分为两种类型:单个消费实例和多个消费者实例。
1.3.1 配置文件
定义一个listener.xml文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <context:component-scan base-package="listener" /> <!--<context:component-scan base-package="concurrent" />--> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="group.id" value="group1"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test"/> <property name="messageListener" ref="kafkaConsumerListener"/> </bean> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" > <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans> |
如果需要配置事务,实现Consume-transform-Prodcue,则如下配置一个kafkaTransactionMaanager
1 2 3 4 5 6 7 8 9 10 11 12 |
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test"/> <property name="messageListener" ref="kafkaConsumerListener"/> <property name="transactionManager" ref="transactionManager"> </bean> <bean id="kafkaTransactionManager" class="org.springframework.kafka.transaction.KafkaTransactionManager"> <constructor-arg> <ref bean="producerFactory"/> </constructor-arg> </bean> |
如果需要配置一个正则表达式的主题,指定一个Pattern对象
1 2 3 4 5 6 7 8 |
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg ref="partern"/> </bean> <bean class="java.util.regex.Pattern" factory-method="compile"> <constructor-arg value=".*some pattern.*" /> <constructor-arg type="int" value="#{T(java.util.regex.Pattern).DOTALL | T(java.util.regex.Pattern).CASE_INSENSITIVE}" /> </bean> |
1.3.2 定义一个messagListener
在上面配置文件中构建ContainerProperties时的属性messaeListener指定。
1 2 3 4 5 6 7 8 9 |
@Component public class KafkaConsumerListener implements MessageListener<String, String> { public void onMessage(ConsumerRecord<String, String> integerStringConsumerRecord) { System.out.printf("offset= %d, key= %s, value= %s\n", integerStringConsumerRecord.offset(), integerStringConsumerRecord.key(), integerStringConsumerRecord.value()); } } |
1.3.3 测试
1 2 3 4 5 6 7 8 9 |
public class Excutor { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml"); System.out.printf("启动listener"); while (true) { } } } |
此时可以向kafka发送消息(发送消息可以直接使用kafka自带的脚本进行),然后就可以看到日志了:
08:38:58.598 [messageListenerContainer-C-1] DEBUG org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer – Received: 2 records
offset= 51, key= null, value= message-1
offset= 52, key= null, value= message-2
1.4 多个消费者实例
通过定义一个ConcurrentMessageListenerContainer来替换上面的KafkaMessageListenerContainer
如上面listener.xml配置中
1 2 3 4 |
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" > <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> |
替换为如下,配置效果就是相当于部署了2个消费者。
1 2 3 4 5 |
<bean id="concurrentMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" > <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> <property name="concurrency" value="2"/> </bean> |
2 实现生产者
为了执行生产者发送消息,spring-kafka提供了一个KafkaTemplate,让我们只关心 send 消息不需要再关注创建producer和关闭producer,还提供了事务的相关操作(Kafka从0.11.0.0版本以上)。
2.1发送消息
提供了如下接口,可以支持发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); LitenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); |
除了上面send操作还提供了如下操作,支持如下接口发送消息
1 |
<T> T execute(ProducerCallback<K, V, T> callback); |
关于ProducerCallback的代码如下:
1 2 3 |
interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); } |
2.2 事务相关操作
提供了如下两个支持事务的操作:
1 2 3 |
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets); void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); |
3 实现消费者
如果没有spring-kafak,我们需要自己创建消费者、还需要有个定时任务保证消费者一直运行获取消息、处理消息之后提交偏移量以及事务一致性(Kafka从0.11.0.0版本以上)等操作。spring-kafka封装了这些操作,只需要使用者关注消费消息的逻辑。一个流程图如下:
3.1 创建消费者
spring-kafka可以创建单个消费者,而且还支持创建多个消费者:
1、创建单个消费者
KafkaMessageListenerContainer 创建一个消费者,支持创建两种类型消费者:消费者群组类型和独立消费者类型。
2、创建多个消费者
ConcurrentMessageListenerContainer可以创建多个消费者,可以通过并发数属性来设置创建多少个消费者,ConcurrentMessageListenerContainer作用其实和部署多个消费者服务是一样的效果。支持创建两种类型消费者:消费者群组类型和独立消费者类。
(1)关于ConcurrentMessageListenerContainer的并发数说明
这个并发数是指一个app实例创建多少个消费者。如果我们部署多个服务,比如10台服务,而分区个数时20,此时可以设置并发数是2,如果分区个数小于10,此时设置并发是1就可以了。
注意:消费消息时使用多线程消费消息, 这种多线程处理可以在自己实现消费消息逻辑中自己实现。=
3.2 定时任务
KafkaMessageListenerContainer实现了Lifecycle接口的start方法,所以在上下文初始化时会调用这个对象的start方法。
1 2 3 4 5 6 7 |
@Override public final void start() { .... // 启动 doStart(); .... } |
KafkaMessageListenerContainer通过doStart方法定义了处理消息逻辑,所以启动任务的入口可以看成是:
1 |
KafkaMessageListenerContainer#doStart() |
3.3 处理消息模板
KafkaMessageListnerContailer#ListenerConsumer#run中实现了处理一个消息泛型逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
while(true){ // 处理提交策略,可以概况为两类:根据时间和次数。 // (1)个数策略:消息个数处理超过阈值ackcOunt // (2)时间策略:执行时间超过阈值ackTime // 把acks队列中只转移到offesets中值,如果提交策略满足就处理offsets中值。 processCommits(); // 执行seek操作。处理offsets队列中值,依次进行seek操作,保证在提交出现问题时,不重复消费消息。 processSeeks(); // 获取消息 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); // 通过listner处理消息。根据不同listenerType调用相关接口处理消息。如果提交策略AckMode#Record模式,则立刻执行commit,否 则添加便宜量信息到ack队列。 invokeListener(records); } |
3.4 用户只关注处理消息逻辑
在使用spring-kafka,我们只需要关注处理消息逻辑,通过继承实现MessageListener来实现处理消息的逻辑,主要包含四种类型Listener:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public enum ListenerType { /** * Acknowledging and consumer aware. */ ACKNOWLEDGING_CONSUMER_AWARE, /** * Consumer aware. */ CONSUMER_AWARE, /** * Acknowledging. */ ACKNOWLEDGING, /** * Simple. */ SIMPLE } |
- SIMPLE,就是在处理消息时,不需要考虑提交偏移量和使用Consumer对象。
1 |
void onMessage(T data); |
- ACKNOWLEDGING, 当需要手动提交时时,而不是自动提交或者spring-kafka自己实现提交的方式时,需要如下接口中acknowledment的acknowlegge()方法来提交偏移量
1 2 3 |
default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } |
- CONSUMER_AWARE,类似Spring IOC 的ApplicationContextAware功能,如果我们在消费消息时,需要用到consumer对象,则需要使用这个类型。
1 2 3 |
default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } |
- ACKNOWLEDGING_CONSUMER_AWARE,同时支持ACKNOWLEDGING和CONSUMER_AWARE两种类型。
1 2 3 |
default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } |
如何确定ListenerType类型?可以根据继承不同类型MeassgeListener可以确定ListenerType:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public final class ListenerUtils { private ListenerUtils() { super(); } public static ListenerType determineListenerType(Object listener) { Assert.notNull(listener, "Listener cannot be null"); ListenerType listenerType; if (listener instanceof AcknowledgingConsumerAwareMessageListener || listener instanceof BatchAcknowledgingConsumerAwareMessageListener) { listenerType = ListenerType.ACKNOWLEDGING_CONSUMER_AWARE; } else if (listener instanceof ConsumerAwareMessageListener || listener instanceof BatchConsumerAwareMessageListener) { listenerType = ListenerType.CONSUMER_AWARE; } else if (listener instanceof AcknowledgingMessageListener || listener instanceof BatchAcknowledgingMessageListener) { listenerType = ListenerType.ACKNOWLEDGING; } else if (listener instanceof GenericMessageListener) { listenerType = ListenerType.SIMPLE; } else { throw new IllegalArgumentException("Unsupported listener type: " + listener.getClass().getName()); } return listenerType; } |
如何使用ListenerType?在KafkaMessageLisnerContailer中根据不同listenerType来处调用不同接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer, Iterator<ConsumerRecord<K, V>> iterator) throws Error { try { switch (this.listenerType) { case ACKNOWLEDGING_CONSUMER_AWARE: this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null, this.consumer); break; case CONSUMER_AWARE: this.listener.onMessage(record, this.consumer); break; case ACKNOWLEDGING: this.listener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record) : null); break; case SIMPLE: this.listener.onMessage(record); break; } ... } |
3.5 提交策略
spring-kafka负责提交策略和自己手动提交,可以参考AckMode
1 2 3 4 5 6 7 8 9 |
public enum AckMode { RECORD, //每处理一个消息,就提交一次 BATCH, //将上一次poll得到消息进行提交 TIME, //达到段时间间隔,就提交 COUNT, //达到次数,就提交 COUNT_TIME, //达到次数和一段时间间隔,就提交 MANUAL // 手动提交 MANUAL_IMMEDIATE //每处理一条消息就提交一次 } |
具体可以分为两类:
1、spring-kafak负责提交
(1)RECORD, 每处理一个消息,就提交一次。和MANUAL_IMMEDIATE类似,只是这里是通过spring-kafka自己操作,而MsManualImmediateAck需要我们自己在代码里面调用Acknowledgment#acknowledge
(2)BATCH, 将上一次poll得到消息进行提交。和MANUAL类似,BATH是通过spring-kafka操作,而MANUAL模式是用户自己通过调用Acknowledgment#acknowledge来执行。
(3)TIME, 达到段时间间隔,就提交
(4)COUNT, 达到次数,就提交
(5)COUNT_TIME, 达到次数和时间间隔两个条件,就提交
2、用户自己手动提交,通过在代码里面调用Acknowledgment#acknowledge()进行提交。
(1)MANUAL:调用Acknowledgment#acknowledge可以先放置到offts中,到时候在processCommits中进行处理。
(2) MANUAL_IMMEDIATE 处理完消息,调用Acknowledgment#acknowledge执行立刻提交。
在这种模式下,自己写提交偏移量逻辑,spring-kafka不负责提交而且也不是kafka的自动提交模式。此时可以继承ListenerType为ACKNOWLEDGING或者ACKNOWLEDGING_CONSUMER_AWARE类型的MessageListener,然后使用如下接口参数中acknowledgment#ackknowlege()方法来提交。
1 2 3 4 5 6 7 |
default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } |
举例如下:
1 2 3 4 5 6 7 8 9 |
public class AckDemo implements AcknowledgingMessageListener { public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { // 1.处理消息 process(data); // 2.手动提交 acknowledgment.acknowledge(); } } |
3.6 事务性
事务性介绍可以参考 :
3.6.1 KafkaTransactionManager
1、doBegine执行初始化事务和开启事务:
在doBegin中通过如下 ProducerFactoryUtils#getTransactionalResourceHolder方法创建KafkaResourceHolder(当成数据库的Connectiion),并执行初始化事务(initTransactions())和开启事务(begineTransactions()),如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
==================================== 类名:ProducerFactoryUtils ==================================== /** * Obtain a Producer that is synchronized with the current transaction, if any. * @param producerFactory the ConnectionFactory to obtain a Channel for * @param <K> the key type. * @param <V> the value type. * @return the resource holder. */ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder( final ProducerFactory<K, V> producerFactory) { Assert.notNull(producerFactory, "ProducerFactory must not be null"); // 1.对于每一个线程会生成一个唯一key,然后根据key去查找resourceHolder @SuppressWarnings("unchecked") KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager .getResource(producerFactory); if (resourceHolder == null) { // 2.创建一个消费者 Producer<K, V> producer = producerFactory.createProducer(); // 3.开启事务 producer.beginTransaction(); resourceHolder = new KafkaResourceHolder<K, V>(producer); bindResourceToTransaction(resourceHolder, producerFactory); } return resourceHolder; } |
创建生成者,并初始化事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
==================================== 类名:DefaultKafkaProducerFactory ==================================== protected Producer<K, V> createTransactionalProducer() { Producer<K, V> producer = this.cache.poll(); if (producer == null) { Map<String, Object> configs = new HashMap<>(this.configs); // 对于每一次生成producer时,都设置一个不同的transactionId configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement()); producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer); // 1.初始化事务。 producer.initTransactions(); return new CloseSafeProducer<K, V>(producer, this.cache); } else { return producer; } } |
2、doCommit提交事务
1 2 3 4 5 6 7 |
protected void doCommit(DefaultTransactionStatus status) { @SuppressWarnings("unchecked") KafkaTransactionObject<K, V> txObject = (KafkaTransactionObject<K, V>) status.getTransaction(); KafkaResourceHolder<K, V> resourceHolder = txObject.getResourceHolder(); // 提交事务 resourceHolder.commit(); } |
通过KafkaResourceHolder的commit来提交事务。
1 2 3 |
public void commit() { this.producer.commitTransaction(); } |
3.6.2 实现提交偏移量事务操作
通过TransactionTemplate#excute来执行消费消息的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public <T> T execute(TransactionCallback<T> action) throws TransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); } else { TransactionStatus status = this.transactionManager.getTransaction(this); T result; try { // 1、处理消息逻辑和发送消息偏移量 result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { // Transactional code threw application exception -> rollback rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { // Transactional code threw unexpected exception -> rollback rollbackOnException(status, ex); throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } // 2、提交事务 this.transactionManager.commit(status); return result; } } |
在上面的action.doIntransaction中在处理完成消息并调用这个操作发送偏移量
1 2 3 4 5 6 |
private void sendOffsetsToTransaction(Producer producer) { handleAcks(); Map<TopicPartition, OffsetAndMetadata> commits = buildCommits(); this.commitLogger.log(() -> "Sending offsets to transaction: " + commits); producer.sendOffsetsToTransaction(commits, this.consumerGroupId); } |
3.6.3 事务提交和提交策略比较
选择事务提交时,如果此时再设置成其他的提交策略AckMode中值:比如说依赖次数或者时间,那么这些设置也是不生效。
3.7 总结
在使用spring-kafka的消费者功能时,可以考虑ContainerProperties中常用属性有:
(1)设置主题
topics:主题
topicPattern:主题模式
topicPartions:包含topic/partions/initial offesets。
(2)提交策略的相关配置
目前分为三种:处理一条就提交、时间策略、次数策略、自己手动提交策略(spring-kafka不再负责提交)
ackMode:指定提交模式
ackCount:采用次数策略提交模式,需要指定个数阈值。
ackTime: 采用时间策略提交时,需要指定时间阈值
syncCommits 同步或异步提交
(3)事务属性的配置
transactionManager 指定一个KafkaTransactionManager对象,它封装了spring 事务管理器,实现kafka的事务功能。
(4)消费消息的业务逻辑
messageListener 需要实现MessageListener接口,实现处理消息具体业务逻辑
(5)消费者信息
groupId 消息分组
clientId 消费者名字的前缀
参考文献
官网:http://projects.spring.io/spring-kafka/
官网文档: https://docs.spring.io/spring-kafka/reference/htmlsingle/