Administrator
发布于 2022-06-08 / 964 阅读
0
0

springboot Kafka 简单使用

环境

note: 192.168.31.211 是本机地址

# Zookeeper
//1、拉取kafka镜像 不指定版本就拉取默认版本
docker pull wurstmeister/zookeeper
//2、启动命令
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" wurstmeister/zookeeper:latest

# kafka
//1、拉取kafka镜像 不指定版本就拉取默认版本
docker pull wurstmeister/kafka
//2、启动命令
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.211:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.211:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka:latest

//1、拉取kafka-manager管理端镜像 不指定版本就拉取默认版本
docker pull sheepkiller/kafka-manager
//2、启动命令
docker run -d --name kfk-manager -p 9000:9000 -e ZK_HOSTS=192.168.31.211:2181 sheepkiller/kafka-manager:latest

项目

依赖

<!-- 导入 web 方便测试 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.0.RELEASE</version>
</dependency>

配置

# application.yml
spring:
  application:
    name: hello-simple-kafka
  kafka:
    listener:
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: single
    # 集群地址
    bootstrap-servers: 192.168.31.211:9092
    # 生产者配置
    producer:
      # 重试次数
      retries: 3
      # 应答级别
      # acks=0 把消息发送到kafka就认为发送成功
      # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
      # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
      acks: all
      # 批量处理的最大大小 单位 byte
      batch-size: 4096
      # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      buffer-memory: 33554432
      # 客户端ID
      client-id: hello-kafka
      # Key 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      compression-type: gzip
    # 消费者配置
    consumer:
      # 默认消费者组
      group-id: testGroup
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest

生产者

@Component
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public boolean send(String msg) {
        kafkaTemplate.send("test", msg);
        return true;
    }
}

消费者

@Component
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(groupId = "3", topics = "test")
    public void listen(String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        log.info("group 3 ==> topic_test 消费了: Topic: " + topic + ",Message:" + msg);
    }

    @KafkaListener(groupId = "1", topics = "test")
    public void topic_test(String record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional<Object> message = Optional.ofNullable(record);
        if(message.isPresent()){
            Object msg = message.get();
            log.info("group 1 ==> topic_test 消费了: Topic: " + topic + ",Message:" + msg);
        }
    }
}

简单调用

@RestController
@RequestMapping("/")
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String sendMessage() {
        return kafkaProducer.send("hello") ? "success" : "failed";
    }
}

测试

访问 http://localhost:8080/send
控制台输出

2022-06-08 21:17:34.062  INFO 828 --- [ntainer#0-0-C-1] org.example.kafka.KafkaConsumer          : group 3 ==> topic_test 消费了: Topic: test,Message:hello
2022-06-08 21:17:34.062  INFO 828 --- [ntainer#1-0-C-1] org.example.kafka.KafkaConsumer          : group 1 ==> topic_test 消费了: Topic: test,Message:hello

评论