您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

kafka的配置和使用

kafka是一款消息中间件。可以帮助我们消费和发送消息。可以做数据同步。

首先kafka需要导入pom文件
在这里插入图片描述
然后需要kafka的核心config如下
@Configuration
@Slf4j
public class KafkaTemplateConfig {

@Value("${kafka.archive.servers}")
private String KAFKA_SERVERS;

@Value("${kafka.archive.group}")
private String GROUPID;

@Value("${kafka.thread.number}")
private Integer threadNum=5;


@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}


@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>(16);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}


public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListenerForReceiver")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListenerForReceiver() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(1);
    return factory;
}

@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(threadNum);
    return factory;
}




public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(16);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// props.put(ConsumerConfig.CLIENT_ID_CONFIG, IPUtil.getHostName() + “-h5-kafka”);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 9000);
return props;
}

}

最后kafka有两个核心的方法,发送和接收方法。首先是发送方法。
在这里插入图片描述

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String key, String mes) {
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
log.info("[TraceId :{}] send message to topic [{}] success: partition: {}, current offset: {},the msg: {}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
producerRecord.value());
}

        @Override
        public void onError(ProducerRecord<String, String> producerRecord, Exception exception) {
            exception.printStackTrace();
            log.info("[TraceId :{}] send message to topic [{}] fault : partition: {},the errMsg: {}",
                    producerRecord.topic(),
                    producerRecord.partition(),
                    exception.toString());
        }
    });
    kafkaTemplate.send(topic, key, mes);
}

其中有三个参数分别是String topic, String key, String mes。参数一是指定对应的topic。参数二是制定此接收方法唯一的一个key。参数三个这个发送方法需要发送的数据,我们先把数据进行封装,传入第三个参数中。注意数据只能放string类型,所以需要提前把数据转换成json串的形式,然后传入。然后在接收的另一边进行解析。

下面是接收方法。
在这里插入图片描述
@Trace
@KafkaListener(containerFactory = “kafkaBatchListener”,idIsGroup = false,
topics= {"#{’${kafka.topic.project.cooperation}’.split(’,’)}"})
public void consumeKafkaRequireStatus(ConsumerRecord<String, String> record){
log.info(“the kafka is consuming the host data, topic={}, offset={},partition={},the data is ={}”,
record.topic(),
record.offset(),
record.partition(),
record.value());

    kafkaUtils.consumeKafkaRequireStatus(record.value());
}

topics里面是指定接收的topic。需要和发送的时候指定的一样。
kafkaUtils.consumeKafkaRequireStatus(record.value());里面是拿到数据以后解析为数据,然后操作数据的逻辑。


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进