一、rocketmq introduce
RocketMQ is a pure java、distribution、queue model open source messaging middleware, it is an queue model messaging middleware is developed by alibaba with reference to the characteristic. of kafka. it was later open source to Apache Foundation and became Apache’s top open source project. it. has high performance high reliability 、high real-time and distributed characteristic
二、rocketmq environment setup
use docker-compose to build,the specific config as follower:
version: '3'
services:
# rocket mq name server
rmqnamesrv:
image: apache/rocketmq:4.9.6
restart: always
container_name: rocket-server
# environment:
# JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m"
# volumes:
# - ../volumes/data/rocket/server/logs:/home/rocketmq/logs
networks:
- rocketmq
ports:
- 9876:9876
command: sh mqnamesrv
# rocket mq broker
rmqbroker:
image: apache/rocketmq:4.9.6
restart: always
container_name: rocket-broker
volumes:
# - ../volumes/data/rocket/broker/logs:/home/rocketmq/logs
# - ../volumes/data/rocket/broker/store:/home/rocketmq/store
- ./config/broker.conf:/opt/rocketmq-4.9.6/conf/broker.conf
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
# - JAVA_OPTS:=-Duser.home=/opt
- JAVA_OPT_EXT=-server -Xms64m -Xmx64m -Xmn64m
depends_on:
- rmqnamesrv
networks:
- rocketmq
ports:
- 10909:10909
- 10911:10911
command: sh mqbroker -c /opt/rocketmq-4.9.6/conf/broker.conf
rmqdashboard:
image: apacherocketmq/rocketmq-dashboard:1.0.0
restart: always
container_name: rocket-dashboard
environment:
- JAVA_OPTS=-Drocketmq.config.namesrvAddr=rmqnamesrv:9876 -Dserver.port=8180 -Drocketmq.config.isVIPChannel=false
# - JAVA_OPT_EXT=-Xms128m -Xmx128m -Xmn128m
depends_on:
- rmqnamesrv
networks:
- rocketmq
ports:
- 8180:8180
networks:
rocketmq:
driver: bridge
stack:
driver: bridge
run docker-compose
docker-compose -f docker-compose-rocketmq.yml -p rocketmq up -d
notice:modify xx/rocketmq/rocketmq_broker/conf/broker.conf brokerIP1 is current local hostIP
access url:http://ip:
8180
三、code project
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
entity
package com.et59.rocketmq.entity;
/**
* @author liuhaihua
* @version 1.0
* @ClassName Person
* @Description todo
* @date 2023年12月29日 9:52
*/
public class Person {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
listener
package com.et59.rocketmq.listener;
import com.et59.rocketmq.entity.Person;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Author: heyuhua
* @Date: 2021/1/8 15:55
*/
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "PERSON_ADD")
public class PersonMqListener implements RocketMQListener<Person> {
@Override
public void onMessage(Person person) {
System.out.println("receiver message,start consumer..name:" + person.getName() + ",age:" + person.getAge());
}
}
util
package com.et59.rocketmq.util;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* RocketMq asistant
*
* @Author: heyuhua
* @Date: 2020/1/8 10:03
*/
@Component
public class RocketMqHelper {
/**
* 日志
*/
private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);
/**
* rocketmq template
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostConstruct
public void init() {
LOG.info("---RocketMq助手初始化---");
}
/**
*send async message
*
* @param topic
* @param message
*/
public void asyncSend(Enum topic, Message<?> message) {
asyncSend(topic.name(), message, getDefaultSendCallBack());
}
/**
* send async message
*
* @param topic
* @param message
* @param sendCallback
*/
public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {
asyncSend(topic.name(), message, sendCallback);
}
/**
* send async message
*
* @param topic
* @param message
*/
public void asyncSend(String topic, Message<?> message) {
rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());
}
/**
* send async message
*
* @param topic
* @param message
* @param sendCallback
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* send async message
*
* @param topic
* @param message
* @param sendCallback
* @param timeout
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* send async message
*
* @param topic
* @param message
* @param sendCallback
* @param timeout
* @param delayLevel
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* send sequntia message
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {
syncSendOrderly(topic.name(), message, hashKey);
}
/**
* send sequntia message
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey) {
LOG.info("send sequntia message,topic:" + topic + ",hashKey:" + hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* send sequntia message
*
* @param message
* @param topic
* @param hashKey
* @param timeout
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {
LOG.info("send sequntia message,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
/**
* default CallBack function
*
* @return
*/
private SendCallback getDefaultSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.info("---send MQ success---");
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
LOG.error("---send MQ fail---"+throwable.getMessage(), throwable.getMessage());
}
};
}
@PreDestroy
public void destroy() {
LOG.info("---RocketMq助手注销---");
}
}
config file(application.yaml)
server:
port: 8088
rocketmq:
name-server: 10.11.68.77:9876
producer:
isOnOff: on
group: hyh-rocketmq-group
groupName: hyh-rocketmq-group
namesrvAddr: 10.11.68.77:9876
maxMessageSize: 4096
sendMsgTimeout: 3000
retryTimesWhenSendFailed: 2
DemoApplication.java
package com.et59.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
code repository
四、test
package com.et59.rocketmq;
import com.et59.rocketmq.entity.Person;
import com.et59.rocketmq.util.RocketMqHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTests {
@Autowired
private RocketMqHelper rocketMqHelper;
@Test
public void testProducter() throws InterruptedException {
for(int i=0;i<1000000;i++){
Person person = new Person();
person.setName("heyuhua");
person.setAge(25);
rocketMqHelper.asyncSend("PERSON_ADD", MessageBuilder.withPayload(person).build());
Thread.sleep(1000);
}
}
}
the result
2024-02-02 15:20:27.101 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:28.116 INFO 11188 --- [ublicExecutor_4] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:29.130 INFO 11188 --- [ublicExecutor_5] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:30.131 INFO 11188 --- [ublicExecutor_7] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:31.142 INFO 11188 --- [ublicExecutor_8] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:32.156 INFO 11188 --- [ublicExecutor_9] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
2024-02-02 15:20:33.167 INFO 11188 --- [blicExecutor_10] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
2024-02-02 15:20:34.182 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---send MQ success---
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25
receive messages,start to message..name:heyuhua,age:25