admin管理员组文章数量:1648359
前言
因工作需要,需在系统利用Kafka监听接口,实现消息队列中,对消息的消费,首选Kafka,因为看中其超高的吞吐量。
基本概念
- 1 Producer: 特指消息的生产者
- 2 Consumer :特指消息的消费者
- 3 Consumer Group :消费者组,可以并行消费Topic中partition的消息
- 4 Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker。
- 5 Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
- 6 Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)
- 7 Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息
- 8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。
集成Spring
- 添加Maven依赖
由于项目使用Maven进行管理,引入Kafka-Spring相关Jar包,需要添加依赖,此处使用的是Kafka0.10.2
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
1 版本兼容性
配置完Maven依赖以后,还需要确认,因为Kafka与Spring有依赖关系,需要确定Spring的版本是否能和Kafka0.10.2完美兼容,查阅Spring For Apache Kafka 文档可知:
Compatibility- Apache Kafka 0.10.2.0
- Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
- Annotation-based listeners require Spring Framework 4.1 or higher, however.
- Minimum Java version: 7.
Kafka 0.10.2 需要SpringFrameWork 4.3.7,但后续会逐渐兼容SpringFrameWork更早期的版本,实践发现,Kafka的生产者里面的api会受SpringFrameWork版本影响,而消费者无影响,因此,可以保持项目中原有springframework不变。
2 排除重复包
引入Maven依赖以后,Kafka的maven依赖,自动包含了springframework相关jar包,需要排除。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>4.3.9.RELEASE</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
- 3 接口区别
Kafka消费者,实现有两种方式:client客户端和listener监听接口,这里因业务需要,采用监听接口的方式实现,Spring提供了四种接口,如下所示:
public interface MessageListener<K, V> {} 1
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {} 2
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface BatchMessageListener<K, V> {} 3
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {} 4
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
对应的解释如下
1、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.
使用MessageListener接口实现时,当消费者拉取消息之后,消费完成会自动提交offset,即enable.automit为true时,适合使用此接口
2、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
使用AcknowledgeMessageListener时,当消费者消费一条消息之后,不会自动提交offset,需要手动ack,即enable.automit为false时,适合使用此接口
3、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.
4、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
BatchMessageListener和BatchAcknowledgingMessageListener接口作用与上述两个接口大体类似,只是适合批量消费消息决定是否自动提交offset
由于业务较重,且offset自动提交时,出现消费异常或者消费失败的情况,消费者容易丢失消息,所以需要采用手动提交offset的方式,因此,这里实现了AcknowledgeMessageListener接口。
Spring配置文件
配置思路:
1、确定需要定义的beans:
- 1 consumerProperties 消费者的基本属性,包括指定bootstrap.servers,group.id等
- 2 consumerFactory :消费者工厂,配置完consumerProperties 后,需要将consumerProperties 作为参数,配置进consumerFactory中
- 3 containProperties: 消费者容器属性对象的bean,这个bean会指定后续自定义的监听接口bean及ackMode(手动提交时,采取什么提交方式)
- 4 messageListenerContainer:消费者容器,启动监听接口的bean,需要将先前定义的consumerFactory 、containProperties配置进这个bean,并定义其init-method = doStart,在启动spring时,便会自动启动监听接口,同时,此bean指定了topic
- 5 kafkaMessageListener:监听接口,这个接口由自己定义,需要将其配置进containProperties中,
具体完整消费者的配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework/schema/beans"
xmlns:xsi="http://www.w3/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework/schema/beans http://www.springframework/schema/beans/spring-beans.xsd">
<!--1、consumer属性配置,hashMap-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
<entry key="group.id" value="${kafka.group.id}"/>
<entry key="enable.automit" value="false"/>
<entry key="session.timeout.ms" value="15000"/>
<!--<entry key="auto.offset.reset" value="earliest"/>-->
<entry key="key.deserializer" value="org.apache.kafkamon.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafkamon.serialization.StringDeserializer"/>
<entry key="value.deserializer.encoding" value="UTF8"/>
<entry key="value.deserializer.encoding" value="UTF8"/>
</map>
</constructor-arg>
</bean>
<!--2、Kafka消费者工厂,DefaultKafkaConsumerFactory-->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!--3、监听接口,AcknowledgingMessageListener-->
<bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener">
<property name="threadPool" ref="kafkaWorkerThreadPool"/>
</bean>
<bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="20"/>
<property name="maxPoolSize" value="200"/>
<property name="queueCapacity" value="500"/>
<property name="keepAliveSeconds" value="1800"/>
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!--4、Kafka消费者容器,属性配置-->
<bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${kafka.topic}"/>
<property name="ackMode" value="MANUAL_IMMEDIATE"/>
<property name="messageListener" ref="kafkaMessageListener"/>
</bean>
<!--5、Kafka消费者容器-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" >
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containProperties"/>
</bean>
</bean>
示例代码
写了个简单的测试用例
生产者:
实现每秒定时向brokers发送一条消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafkamon.serialization.IntegerSerializer;
import org.apache.kafkamon.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class SimpleKafkaProducer implements Runnable {
protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);
@Override
public void run() {
Map<String, Object> sendProps = senderProps();
Producer producer = new KafkaProducer(sendProps);
Integer currentNum = 0;
try {
LOGGER.info("start produce message");
while (true){
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);
producer.send(producerRecord);
LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());
currentNum++;
Thread.sleep(1000);
}
}catch (Exception e){
LOGGER.error("send message fail", e);
}finally {
producer.close();
}
}
public static void main(String[] args) {
SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();
new Thread(simpleKafkaProducer).start();
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
消费者
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
@Override
public void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {
//TODO 这里具体实现个人业务逻辑
// 最后 调用acknowledgment的ack方法,提交offset
acknowledgment.acknowledge();
}
}
消费者使用示例:这里参考spring官方文档,简单实现了一个消费者监听接口示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafkamon.serialization.IntegerDeserializer;
import org.apache.kafkamon.serialization.StringDeserializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
public class SimpleKafkaConsumer extends SpringUnitTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
@Resource(name = "kafkaMessageListener")
private KafkaMessageListener kafkaMessageListener;
@Test
public void TestLinstener(){
ContainerProperties containerProps = new ContainerProperties("testTopic");
containerProps.setMessageListener(kafkaMessageListener);
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("messageListenerContainer");
container.start();
}
private static KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private static Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
实现acknowledgeMessageListener接口之前,查阅了网上现有的文档,结果不尽如人意,只能试着自己去参考spring官方文档,慢慢摸索,最终实现手动提交offset的监听接口,当然,Kafka的知识点,远不止这些,后续还将继续学习。
本文标签: 接口KafkaSpringAcknowledgeMessageListener
版权声明:本文标题:Kafka集成Spring-AcknowledgeMessageListener接口实现 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1729494088a1202737.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论