SpringBoot项目集成kafka及常规配置

chatgpt/2023/10/4 6:54:37

desc:

        使用 spring-kafka 的api,在springboot项目中集成kafka能力,封装配置。

0.引入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

1.kafka相关配置

1.1 KafkaConfiguration 公共配置

@Data
@Configuration
public class KafkaConfiguration {/*** 主机地址*/@Value("${kafka.server-host}")private String bootstrapServers;/*** sasl 认证账号*/@Value("${iot.kafka.sasl.username:admin}")private String userName;/*** sasl 密码*/@Value("${iot.kafka.sasl.password:iot@2021}")private String password;}

1.2 KafkaConsumerConfiguration 消费者配置

@Data
@Configuration
public class KafkaConsumerConfiguration {/*** 默认组id*/@Value("${iot.kafka.consumer.properties.group-id:default-group}")private String groupId;@Value("${iot.kafka.consumer.properties.fetch-max-wait:5000}")private Integer fetchMaxWait;/*** 此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔,减少重新平衡分组的*/@Value("${iot.kafka.consumer.properties.max-poll-record:100}")private Integer maxPollRecordsConfig;/*** 增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批),缺点是此值越大将会延迟组重新平衡。*/@Value("${iot.kafka.consumer.properties.max-poll-interval-ms:100}")private Integer maxPollIntervalConfig;/*** 是否开启自动提交*/@Value("${iot.kafka.consumer.properties.enable-auto-commit:#{false}}")private boolean enableAutoCommitConfig;/*** 消费策略* earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据* none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常*/@Value("${iot.kafka.consumer.properties.auto-offset-reset:earliest}")private String autoOffsetResetConfig;@Value("${iot.kafka.consumer.properties.auto-commit-interval:1000}")private String autoCommitIntervalMsConfig;@Value("${iot.kafka.consumer.properties.session-timeout:30000}")private String sessionTimeoutMsConfig;}

1.3 KafkaListenerConfiguration 监听配置

@Data
@Configuration
public class KafkaListenerConfiguration {/*** 启用线程数(提高并发)*/@Value("${iot.kafka.listener.concurrency:3}")private Integer concurrency;/*** 手动提交的方式,当enable-auto-commit: false时起作用* manual:手动调用Acknowledgment.acknowledge()后立即提交* record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交* batch:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交* time: 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交* count:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交* count_time:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交*/@Value("${iot.kafka.listener.ack-mode:manual_immediate}")private String ackMode;/*** 消费超时时间*/@Value("${iot.kafka.listener.poll-timeout:3000}")private Long pollTimeout;/*** 是否开启批量处理*/@Value("${iot.kafka.listener.batch_listener:#{true}}")private Boolean batchListener;}

1.4 KafkaProducerConfiguration 生产者配置

@Data
@Configuration
public class KafkaProducerConfiguration {/*** 重试次数 默认值0*/@Value("${iot.kafka.producer.retries:0}")private Integer retries;/*** acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为 - 1。* acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。* acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1 的设置。*/@Value("${iot.kafka.producer.acks:all}")private String acks;/*** 指定缓存的大小,生产者缓存每个分区未发送的消息。默认 16384*/@Value("${iot.kafka.producer.batch-size:16384}")private Integer batchSize;/*** 生产者发送请求之前等待一段时间,设置等待时间是希望更多地消息填补到未满的批中。 默认 30*/@Value("${iot.kafka.producer.properties.linger.ms:30}")private Integer lingerMs;/*** 通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的 默认32m*/@Value("${iot.kafka.producer.buffer-memory:33554432}")private Integer bufferMemory;}

2.工厂配置

2.1 ConsumerFactoryBuilder 消费者工厂

@Configuration
public class ConsumerFactoryBuilder {@Autowiredprivate KafkaConfiguration kafkaConfiguration;@Autowiredprivate KafkaConsumerConfiguration kafkaConsumerConfiguration;@Autowiredprivate KafkaListenerConfiguration kafkaListenerConfiguration;/*** 消费者配置** @return properties*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new ConcurrentHashMap<>();//配置地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());//消费者组 默认组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfiguration.getGroupId());//是否开启自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerConfiguration.isEnableAutoCommitConfig());/* 消费策略* earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据* none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常*/props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfiguration.getAutoOffsetResetConfig());//消费者默认等待服务响应时间(毫秒)props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaConsumerConfiguration.getFetchMaxWait());props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConsumerConfiguration.getAutoCommitIntervalMsConfig());props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfiguration.getSessionTimeoutMsConfig());props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfiguration.getMaxPollRecordsConfig());props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfiguration.getMaxPollIntervalConfig());//key序列化器选择props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化器选择props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置sasl认证props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + kafkaConfiguration.getUserName() + "' password='" + kafkaConfiguration.getPassword() + "';");return props;}/*** kafka消费者工厂*/@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}/*** 监听工厂*/@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//线程数factory.setConcurrency(kafkaListenerConfiguration.getConcurrency());//手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);//开启批量处理factory.setBatchListener(kafkaListenerConfiguration.getBatchListener());factory.getContainerProperties().setPollTimeout(kafkaListenerConfiguration.getPollTimeout());return factory;}}

2.2 ProducerFactoryBuilder 生产者工厂


@Configuration
public class ProducerFactoryBuilder {@Autowiredprivate KafkaConfiguration kafkaConfiguration;@Autowiredprivate KafkaProducerConfiguration kafkaProducerConfiguration;/*** 生产者配置** @return 配置*/@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(11);//kafka server地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());/** acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为 - 1。* acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。* acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1 的设置。*/props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfiguration.getAcks());//消息发送失败重试次数props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfiguration.getRetries());//去缓冲区中一次拉16k的数据,发送到brokerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfiguration.getBatchSize());// 批量发送,延迟为30毫秒,如果30ms内凑不够batch则强制发送,提高并发props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfiguration.getLingerMs());//设置缓存区大小 32mprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerConfiguration.getBufferMemory());//key序列化器选择props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value序列化器选择props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置sasl认证props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + kafkaConfiguration.getUserName() + "' password='" + kafkaConfiguration.getPassword() + "';");return props;}/*** Producer Template 配置*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> stringObjectMap = producerConfigs();DefaultKafkaProducerFactory<String, String> objectObjectDefaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(stringObjectMap);return new KafkaTemplate<>(objectObjectDefaultKafkaProducerFactory);}
}

3.配置文件

需配置 kafka.server-host 其他在代码中均已配置默认值

kafka:server-host: 192.168.1.113:9048

4.使用示例

@Slf4j
@RestController
@RequestMapping("test")
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("test")public RestResult test() {kafkaTemplate.send("TOPIC_NAME", 0, "key", "this is a message");return RestResult.wrapSuccessResponse();}@KafkaListener(topics = "TOPIC_NAME", groupId = "MyGroup1", containerFactory = "kafkaListenerContainerFactory")public void kafkaListener(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {for (ConsumerRecord item : records) {System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \n", item.topic(), item.offset(), item.partition(), item.value());log.info("topic is : {}, offset is : {},partition is : {}, value is : {}",item.topic(), item.offset(), item.partition(), item.value());}ack.acknowledge();}}

-- 230619更正 增加配置文件及默认值说明。

以上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5313137.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

求分享如何批量压缩视频的容量的方法

视频内存过大&#xff0c;不但特别占内存&#xff0c;而且还会使手机电脑出现卡顿的现象&#xff0c;除此之外&#xff0c;如果我们想发送这些视频文件可能还会因为内存太大无法发送。因此&#xff0c;我们可以批量地压缩视频文件的内存大小&#xff0c;今天小编要来分享一招&a…

打印Winfrom控件实现简陋版的打印(C#)

本文在前面写的博文基础上进行修改&#xff1a;利用Graphics的CopyFromScreen实现简陋版的打印(C#)_zxy2847225301的博客-CSDN博客 通过截图的方式进行打印在前面的文章后面已经介绍过&#xff0c;有问题。 UI布局如下&#xff1a; 代码如下&#xff1a; using System; using…

FlinkSql维表join之Temporal table join

什么是维表join&#xff1f; 对于每条流式数据&#xff0c;可以关联一个外部维表数据源&#xff0c;为FlinkSql实时计算提供数据关联查询。 说明&#xff1a; 维表是一张不断变化的表&#xff0c;在维表JOIN时&#xff0c;需指明该条记录关联维表快照的时刻。维表JOIN仅支持对…

sql group by 加条件

在SQL中&#xff0c;可以在GROUP BY子句中加入条件&#xff0c;以进一步过滤结果。你可以使用HAVING子句来添加条件。HAVING子句的使用方式类似于WHERE子句&#xff0c;但不同的是&#xff0c;它用于对GROUP BY子句生成的分组进行过滤。 以下是一个示例&#xff0c;演示如何在…

目标检测中 anchor base和anchor free

目标检测中两种不同anchor的生成 趋势&#xff1a;anchor free越来越受到实时性检测的青睐&#xff0c;&#xff0c;&#xff0c;

基于双 STM32+FPGA 的桌面数控车床控制系统设计

桌 面数控 设 备 对 小 尺寸零件加工在成 本 、 功 耗 和 占 地 面 积等方 面有 着 巨 大 优 势 。 桌 面数控 设 备 大致 有 3 种 实 现 方 案 : 第 一种 为 微 型 机 床搭 配 传统 数控系 统 &#xff0c; 但 是 桌 面数控 设 备 对 成 本 敏感 ; 第二 种 为 基 于 PC…

生成对抗网络DCGAN学习实践

在AI内容生成领域&#xff0c;有三种常见的AI模型技术&#xff1a;GAN、VAE、Diffusion。其中&#xff0c;Diffusion是较新的技术&#xff0c;相关资料较为稀缺。VAE通常更多用于压缩任务&#xff0c;而GAN由于其问世较早&#xff0c;相关的开源项目和科普文章也更加全面&#…

idea application.yml配置文件没有提示或读不到配置

1.首先确定你的resources文件夹正常且yml文件图表和下面一样 不一样的右键去设置 2.确保你已经缩进了且层级关系正常 3.如果以上都不是&#xff0c;先考虑删除.idea重开试试 4.以上解决不了就装以下两个插件解决
推荐文章