Spring Boot integrated RabbitMQ Quick start Demo

HBLOG
4 min readApr 23, 2024

--

1.what is RabbitMQ?

RabbitMQ is a use Erlang Language developed based on AMQP The protocol’s message middleware, as an excellent messaging system, RabbitMQ It has the advantages of high concurrency and scalability, and is suitable for communication between various modules in large systems.

RabbitMQ The characteristics are:

  • Persistence, transmission confirmation, release confirmation and other functions ensure message reliability
  • Supports multiple message distribution modes, making processing more flexible
  • Provides visual management interface, easy to use
  • Support cluster deployment to ensure high service availability

2.RabbitMQ Environment setup

version: '3'
services:
rabbitmq:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management #
hostname: my-rabbit
restart: unless-stopped
environment:
LANG: en_US.UTF-8
RABBITMQ_DEFAULT_VHOST: my_vhost
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
volumes:
- "./rabbitmq/data:/var/lib/rabbitmq"
ports:
- "5672:5672"
- "15672:15672"

run

docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d

web Management side:http://127.0.0.1:15672 Login account password:admin/admin

3.code engineering

Experiment purpose: to achieve passing rabbitmq Send and receive messages

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

application.properties

server.port=8088
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vahost

config

Short answer usage

package com.et.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}

topic yes RabbitMQ The most flexible way, which can be based on routing_key Bind different queues freely First of all topic Rule configuration, here two queues are used for testing

package com.et.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String TOPIC_ONE = "topic.one";
public final static String TOPIC_TWO = "topic.two";
public final static String TOPIC_EXCHANGE = "topicExchange";
@Bean
public Queue queue_one(){
return new Queue(TOPIC_ONE);
}
@Bean
public Queue queue_two(){
return new Queue(TOPIC_TWO);
}
@Bean
TopicExchange exchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
Binding bindingExchangeOne(Queue queue_one, TopicExchange exchange){
return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
}
@Bean
Binding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){
//# express zero or more words
//* express a word
return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
}
}

Fanout It is the broadcast model or subscription model that we are familiar with, giving Fanout The switch sends a message, and all queues bound to the switch receive the message.

package com.et.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}

receiver

package com.et.rabbitmq.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class HelloReceiver {

@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
@RabbitListener(queues = {"topic.one"})
public void receiveTopic1(@Payload String fileBody) {
log.info("topic1:" + fileBody);
}
@RabbitListener(queues = {"topic.two"})
public void receiveTopic2(@Payload String fileBody) {
log.info("topic2:" + fileBody);
}
@RabbitListener(queues = {"fanout.A"})
public void fanoutA(@Payload String fileBody) {
log.info("fanoutA:" + fileBody);
}
@RabbitListener(queues = {"fanout.B"})
public void fanoutB(@Payload String fileBody) {
log.info("fanoutB:" + fileBody);
}
@RabbitListener(queues = {"fanout.C"})
public void fanoutC(@Payload String fileBody) {
log.info("fanoutC:" + fileBody);
}
}

sender

package com.et.rabbitmq.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
package com.et.rabbitmq.sender;
import com.et.rabbitmq.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
//two recever will be received
public void send_one() {
String context = "Hi, I am message one";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);
}

//only TopicReceiverTwo can be received
public void send_two() {
String context = "Hi, I am message two";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);
}
}

DemoApplication.java

package com.et.quartz;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

The above are just some key codes. For all codes, please see the code repository below.

code repository

4.test

Simple to use

@Test
public void hello() throws Exception {
helloSender.send();
Thread.sleep(50000);
}

Topic Exchange

@Test
public void topicOne() throws Exception {
topicSender.send_one();
Thread.sleep(50000);
}
@Test
public void topicTwo() throws Exception {
topicSender.send_two();
Thread.sleep(50000);
}

Fanout Exchange

@Test
public void sendFanout() throws InterruptedException {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
Thread.sleep(50000);
}

5.References

--

--