环境
note: 192.168.31.211 是本机地址
# Zookeeper
//1、拉取kafka镜像 不指定版本就拉取默认版本
docker pull wurstmeister/zookeeper
//2、启动命令
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" wurstmeister/zookeeper:latest
# kafka
//1、拉取kafka镜像 不指定版本就拉取默认版本
docker pull wurstmeister/kafka
//2、启动命令
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.211:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.211:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka:latest
//1、拉取kafka-manager管理端镜像 不指定版本就拉取默认版本
docker pull sheepkiller/kafka-manager
//2、启动命令
docker run -d --name kfk-manager -p 9000:9000 -e ZK_HOSTS=192.168.31.211:2181 sheepkiller/kafka-manager:latest
项目
依赖
<!-- 导入 web 方便测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
配置
# application.yml
spring:
application:
name: hello-simple-kafka
kafka:
listener:
#设置是否批量消费,默认 single(单条),batch(批量)
type: single
# 集群地址
bootstrap-servers: 192.168.31.211:9092
# 生产者配置
producer:
# 重试次数
retries: 3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: all
# 批量处理的最大大小 单位 byte
batch-size: 4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
buffer-memory: 33554432
# 客户端ID
client-id: hello-kafka
# Key 序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
compression-type: gzip
# 消费者配置
consumer:
# 默认消费者组
group-id: testGroup
# 自动提交 offset 默认 true
enable-auto-commit: false
# 自动提交的频率 单位 ms
auto-commit-interval: 1000
# 批量消费最大数量
max-poll-records: 100
# Key 反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: latest
生产者
@Component
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
public boolean send(String msg) {
kafkaTemplate.send("test", msg);
return true;
}
}
消费者
@Component
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(groupId = "3", topics = "test")
public void listen(String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
log.info("group 3 ==> topic_test 消费了: Topic: " + topic + ",Message:" + msg);
}
@KafkaListener(groupId = "1", topics = "test")
public void topic_test(String record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
Optional<Object> message = Optional.ofNullable(record);
if(message.isPresent()){
Object msg = message.get();
log.info("group 1 ==> topic_test 消费了: Topic: " + topic + ",Message:" + msg);
}
}
}
简单调用
@RestController
@RequestMapping("/")
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/send")
public String sendMessage() {
return kafkaProducer.send("hello") ? "success" : "failed";
}
}
测试
访问 http://localhost:8080/send
控制台输出
2022-06-08 21:17:34.062 INFO 828 --- [ntainer#0-0-C-1] org.example.kafka.KafkaConsumer : group 3 ==> topic_test 消费了: Topic: test,Message:hello
2022-06-08 21:17:34.062 INFO 828 --- [ntainer#1-0-C-1] org.example.kafka.KafkaConsumer : group 1 ==> topic_test 消费了: Topic: test,Message:hello