Spring Boot 整合kafka


关于kafka的介绍这里就不在过多说明,可以看之前写过一遍文章使用docker-compose构建kafka集群 文章里面有关于kafka的一些介绍以及环境搭建,文章中的环境搭建是基于docker和docker-compose的,如果不想通过docker构建,也可以直接下载kafka的安装包直接在机器上启动,之前的文章链接kafka常用操作笔记。这里也不再叙述,今天主要来看Spring Boot中如何对接kafka进行数据的消费与生产。

配置POM

第一步当然是先引入pom依赖

 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

配置kafka基本信息

spring:
  kafka:
    bootstrap-servers: localhost:9092  # 多个使用`,`隔开
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 0 # 失败重试次数
      batch-size: 16384
      buffer-memory: 33554432
      acks: -1 # 取值 all, -1, 0, 1
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 5000
      group-id: group-test
      auto-offset-reset: earliest # 消费offset取值earliest,latest,none(默认:latest)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 自定义producer topic
kafka:
  producer:
    topic: test-producer

详细的配置介绍:spring-kafka生产者消费者配置详解

启动zookeeper和kafka服务

启动zookeeper

./bin/zookeeper-server-start.sh  -daemon  config/zookeeper.properties

启动kafka

./bin/kafka-server-start.sh -daemon config/server.properties

通过jps检查进程

➜  kafka_2.11-2.3.0 jps
27442
40473 QuorumPeerMain # zookeeper进程
40795 Kafka # kafka进程
40847 Jps

生产消息

为了方便看数据,我们定义一个Metric类,用来保存数据,并每隔3s往kafka服务器发送一次数据,并且在程序启动以后,通过CommandLineRunner初始化发送

  • metric类
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Map;

/**
 * Created by xiaozhangge on 2020-01-07.
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Metric implements Serializable {

    private String hostname;

    private long total;

    @SerializedName("succ_cnt")
    private long succCnt;

    @SerializedName("succ_rate")
    private float succRate;

    private int status;

    private LocalDateTime timestamp;

    private Map<String, Object> tags;
}
  • Spring Boot启动类
import com.google.common.collect.Maps;
import com.xiaozhangge.common.util.DecimalUtils;
import com.xiaozhangge.common.util.IdHelper;
import com.xiaozhangge.common.util.JsonParse;
import com.xiaozhangge.kafka.pojo.Metric;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.kafka.core.KafkaTemplate;

import java.time.LocalDateTime;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Created by xiaozhangge on 2020-01-06.
 * 动态配置多个topic
 * https://github.com/spring-projects/spring-kafka/issues/361
 */
public class KafkaApplication implements CommandLineRunner {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 这里对应我们在yml中自定义的配置,用于获取发送数据用的topic
     */
    @Value("${kafka.producer.topic}")
    private String producerTopic;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        Random random = new Random();
        while (true) {
            writer2Kafka(random);
            // 每隔3s向kafka发送数据
            TimeUnit.SECONDS.sleep(3);
        }
    }

    private void writer2Kafka(Random random) {
        Metric metric = new Metric();
        metric.setHostname("NODE-" + IdHelper.id());
        metric.setTotal(random.nextInt(10000));
        metric.setSuccCnt(random.nextInt(9900));

        float v = DecimalUtils.divide(metric.getSuccCnt(), metric.getTotal());
        metric.setSuccRate(v);
        metric.setStatus(random.nextInt(2));
        metric.setTimestamp(LocalDateTime.now());
        Map<String, Object> tagMap = Maps.newHashMap();
        tagMap.put("cpu_util", random.nextFloat() * 100);
        tagMap.put("mem_util", random.nextFloat() * 100);
        metric.setTags(tagMap);

        kafkaTemplate.send(producerTopic, JsonParse.toJson(metric));
        kafkaTemplate.flush();
    }
}

启动程序后,如果topic不存在则会自动创建topic(我们并没有关闭自动创建topic),通过kafka-consumeer命令可以看到topic中的数据

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-producer --from-beginning

# 样例数据
{"hostname":"NODE-55","total":6704,"succ_cnt":1441,"succ_rate":0.2149463,"status":0,"timestamp":"2020-01-07T21:29:21.501","tags":{"mem_util":5.2691402,"cpu_util":97.53448}}
{"hostname":"NODE-23","total":9579,"succ_cnt":2191,"succ_rate":0.22872952,"status":0,"timestamp":"2020-01-07T21:29:25.422","tags":{"mem_util":27.203756,"cpu_util":43.626713}}
{"hostname":"NODE-8","total":6889,"succ_cnt":786,"succ_rate":0.114094935,"status":0,"timestamp":"2020-01-07T21:29:28.428","tags":{"mem_util":18.526094,"cpu_util":87.67309}}
{"hostname":"NODE-142","total":8753,"succ_cnt":4227,"succ_rate":0.48292014,"status":0,"timestamp":"2020-01-07T21:29:31.436","tags":{"mem_util":87.426476,"cpu_util":12.49879}}
{"hostname":"NODE-198","total":2261,"succ_cnt":4251,"succ_rate":1.8801415,"status":0,"timestamp":"2020-01-07T21:29:34.444","tags":{"mem_util":5.665392,"cpu_util":69.29729}}
{"hostname":"NODE-46","total":2846,"succ_cnt":6600,"succ_rate":2.3190444,"status":0,"timestamp":"2020-01-07T21:29:37.451","tags":{"mem_util":66.49958,"cpu_util":18.604118}}
{"hostname":"NODE-47","total":5110,"succ_cnt":9650,"succ_rate":1.888454,"status":1,"timestamp":"2020-01-07T21:29:40.459","tags":{"mem_util":92.13784,"cpu_util":60.55072}}
Processed a total of 195 messages

通过命令,我们可以看到kafka中已经有新写进去的数据,那么我们在Spring Boot中又该如何对数据进行消费呢?

消费消息

生产数据很简单,消费数据也不难,主要使用Spring Boot提供的注解@KafkaListener,这时候我们需要自定义一个consumer类,刚刚我们是往test-producer这个topic里面写,现在通过程序消费这个topic

@KafkaListener 常用的参数

  • topic 设置消费的topic
  • groupId 指定消费组

MetricConsumer


import com.xiaozhangge.common.util.JsonParse;
import com.xiaozhangge.kafka.pojo.Metric;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * Created by xiaozhangge on 2020-01-07.
 */
@Slf4j
@Component
public class MetricConsumer {

    @KafkaListener(topics = "test-producer")
    public void event(ConsumerRecord<String, String> record) {
        Optional<String> kafkaMessage = Optional.ofNullable(record.value());
        kafkaMessage.ifPresent(x -> {
            Metric metric = JsonParse.parse(x, Metric.class);
            log.info("消费kafka中的数据:{}", metric);
        });
    }
}

消费到的数据结果

2020-01-07 21:38:10.646  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-128, total=7849, succCnt=9252, succRate=1.1787488, status=1, timestamp=2020-01-07T21:38:10.639, tags={mem_util=83.20216, cpu_util=2.36848})
2020-01-07 21:38:13.651  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-199, total=3799, succCnt=595, succRate=0.15662016, status=0, timestamp=2020-01-07T21:38:13.647, tags={mem_util=37.34904, cpu_util=56.30006})
2020-01-07 21:38:16.662  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-82, total=2797, succCnt=3004, succRate=1.0740079, status=1, timestamp=2020-01-07T21:38:16.654, tags={mem_util=87.53287, cpu_util=53.029602})
2020-01-07 21:38:19.671  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-102, total=3232, succCnt=3041, succRate=0.9409035, status=0, timestamp=2020-01-07T21:38:19.664, tags={mem_util=87.7293, cpu_util=51.062298})
2020-01-07 21:38:22.679  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-173, total=904, succCnt=759, succRate=0.83960176, status=0, timestamp=2020-01-07T21:38:22.672, tags={mem_util=3.923422, cpu_util=36.679466})
2020-01-07 21:38:25.685  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-4, total=2250, succCnt=8072, succRate=3.5875556, status=1, timestamp=2020-01-07T21:38:25.681, tags={mem_util=71.37522, cpu_util=2.6286244})
2020-01-07 21:38:28.696  INFO 42501 --- [ntainer#0-0-C-1] c.i.kafka.consumer.MetricConsumer        : 消费kafka中的数据:Metric(hostname=NODE-87, total=2798, succCnt=3955, succRate=1.4135096, status=0, timestamp=2020-01-07T21:38:28.689, tags={mem_util=98.27863, cpu_util=23.622477})

但是大家发现了一点,就是我们的topic都是写死在程序里面的,不能动态传递读取配置文件,这样的肯定是不可以的,那么如何动态配置又是一个新的问题,有人可能会说这里使用@Value进行注入,但是实际这样是编译不过去的,因为注解属性的值必须是一个constant,解决方法就是使用SPEL表达式

Issues地址:动态配置多个topic

修改以后的代码

package com.xiaozhangge.kafka.consumer;

import com.xiaozhangge.common.util.JsonParse;
import com.xiaozhangge.kafka.pojo.Metric;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * Created by xiaozhangge on 2020-01-07.
 * <p>
 * 动态配置多个topic
 * * https://github.com/spring-projects/spring-kafka/issues/361
 */
@Slf4j
@Component
public class MetricConsumer {


    /**
     * 注意这里的topic配置
     * 当然groupId也可以使用SPEL表达书进行配置,这里就不在赘述
     *
     * @param record
     */
    @KafkaListener(topics = "#{'${kafka.producer.topic}'}", groupId = "metric-group")
    public void event(ConsumerRecord<String, String> record) {
        Optional<String> kafkaMessage = Optional.ofNullable(record.value());
        kafkaMessage.ifPresent(x -> {
            Metric metric = JsonParse.parse(x, Metric.class);
            log.info("消费kafka中的数据:{}", metric);
        });
    }
}

修改完成,重启进程,查看依旧可以正常消费到数据,并且topic是动态配置的,如果需要配置多个使用spilt进行分割即可

总结

本文主要介绍了Spring Boot与kafka 的整合,日常开发中除了kafka还有其他的各种消息中间件,整合方式大同小异,毕竟Spring Boot已经帮我们封装的很好了,唯一需要注意的就是注解属性的动态注入,这里需要使用SPEL表达式。

完整代码:https://github.com/xiaomozhang/personal-life-example/tree/master/example-kafka


文章作者: 小张哥
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小张哥 !
评论
 上一篇
Java8 中的日期类序列化问题 Java8 中的日期类序列化问题
自从习惯了使用Java8 中的日期类以后,就已经完全抛弃了java.sql、java.util中的日期处理类,但是Java8中的日期类在序列化与反序列化时不是我们正常看到的标准格式,这里记录一下如何使用Gson进行序列化和反序列化时正常对J
2020-01-07
下一篇 
Spring Boot 使用jpa和H2数据库 Spring Boot 使用jpa和H2数据库
日常测试时没有数据库可用时,使用H2不失为一种解决方案,而且H2也经常用在持久层的单元测试,Spring Boot中使用H2也很简单,只要一些配置即可。 项目结构与pom.xml先看看整体的项目结构 ➜ example-h2 git:(m
2020-01-05
  目录