springboot integrated kafka quick start demo

HBLOG
3 min readFeb 23, 2024

--

一、kafka introduction

Kafka is an open source software based on a distributed publish-subscribe messaging system. The goal is to provide high throughput, low latency, scalability, and fault tolerance. Kafka stores messages in a configurable number of partitions to achieve horizontal scalability, supports multiple producers and consumers, and has a good reliability guarantee mechanism. In addition, Kafka also supports functions such as data replication, failover, and offline data processing, and is widely used in scenarios such as website activity tracking, log collection and analysis, streaming processing, and message queues.

二、environment setup

use docker-compose to setup test environment,Specific configuration as follows:

version: '3'
networks:
kafka:
ipam:
driver: default
config:
- subnet: "172.22.6.0/24"
services:
zookepper:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest # 原镜像`bitnami/zookeeper:latest`
container_name: zookeeper-server
volumes:
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_ANONYMOUS_LOGIN: yes
ports:
- "2181:2181"
networks:
kafka:
ipv4_address: 172.22.6.11
kafka:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1 # 原镜像`bitnami/kafka:3.4.1`
container_name: kafka # 容器名为'kafka'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181 # zookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092 # TODO 填写域名或主机IP -- 让客户端能够监听消息 ( host.docker.internal:自动识别主机IP,在Windows或Mac上运行Docker有效 )
ports: # 映射端口
- "9092:9092"
depends_on: # 解决容器依赖启动先后问题
- zookepper
networks:
kafka:
ipv4_address: 172.22.6.12
# kafka-map图形化管理工具
kafka-map:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map # 原镜像`dushixiang/kafka-map:latest`
container_name: kafka-map # 容器名为'kafka-map'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes:
- "./kafka/kafka-map/data:/usr/local/kafka-map/data"
environment:
DEFAULT_USERNAME: admin
DEFAULT_PASSWORD: 123456
ports: # 映射端口
- "8080:8080"
depends_on: # 解决容器依赖启动先后问题
- kafka
networks:
kafka:
ipv4_address: 172.22.6.13

start test environment

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

add cluster

create topic

三、code project

This project mainly tests the producer sending messages to the topic [testTopic], and then the consumer receives the messages

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafaka</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!-- kafka依赖 begin -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka依赖 end -->
</dependencies>
</project>

application.properties

server.port=8088
#### kafka配置生产者 begin ####
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ####

#### kafka配置消费者 start ####
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#if'enable.auto.commit'is true,then consumer offset auto commit Kafka frequency (in milliseconds), default is 5000。
spring.kafka.consumer.auto-commit-interval=1000
# Specify the encoding and decoding method of message key and message body
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#### kafka config consumer end ####

consumer

package com.et.kafaka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* Listener topic=testTopic
*
* @author Lynch
*/
@Component
public class ConsumerListener {

@KafkaListener(topics = "testTopic")
public void onMessage(String message){
//insertIntoDb(buffer);
System.out.println("message: " + message);
}
}

producer

package com.et.kafaka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
/**
*
*
* @author Lynch
*/
@Controller
@RequestMapping("/api/kafka/")
public class KafkaController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("send")
@ResponseBody
public boolean send(@RequestParam String message) {
try {
kafkaTemplate.send("testTopic", message);
System.out.println("send success...");
} catch (Exception e) {
e.printStackTrace();
}

return true;
}

@GetMapping("test")
@ResponseBody
public String test(){
System.out.println("hello world!");
return "ok";
}
}

start class

package com.et.kafaka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

code repository

四、test

  1. start java application
  2. producer:http://localhost:8088/api/kafka/send?message=aaabbbcccdddd
  3. consumer receives message 【aaabbbcccdddd】
send success...
message: aaabbbcccdddd

五、reference

--

--

HBLOG
HBLOG

Written by HBLOG

talk is cheap ,show me your code

No responses yet