admin管理员组文章数量:1641873
在原有pom.xml依赖下新添加一下kafka依赖ar包
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
</dependency>
application.properties:
#原始数据kafka读取
kafka.consumer.servers=IP:9092,IP:9092(kafka消费集群ip+port端口)
kafka.consumer.enable.automit=true(是否自动提交)
kafka.consumer.session.timeout=20000(连接超时时间)
kafka.consumer.automit.interval=100
kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费)
kafka.consumer.topic=result(消费的topic)
kafka.consumer.group.id=test(消费组)
kafka.consumer.concurrency=10(设置消费线程数)
#协议转换后存储kafka
kafka.producer.servers=IP:9092,IP:9092(kafka生产集群ip+port端口)
kafka.producer.topic=result(生产的topic)
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
springboot生产者配置:
package com.mapbar.track_storage.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafkamon.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* kafka生产配置
* @author Lvjiapeng
*
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerCon
本文标签: 生产者详解消费者SpringBootKafka
版权声明:本文标题:springboot配置kafka生产者和消费者详解 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1729330802a1196370.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论