1.disruptor introduction
What is Disruptor?
- Disruptor is a high-performance concurrency framework developed by British foreign exchange trading company LMAX. It can be considered as an efficient and low-latency memory message component for inter-thread communication. Its biggest feature is high performance. Unlike Kafka and RabbitMQ, which are used for message queues between services, disruptor is generally used to transfer messages between multiple threads in a JVM.。
- From a functional point of view, Disruptor implements the “queue” function, and it is a bounded queue (in fact, it is a lock-free inter-thread communication library). The function is similar to ArrayBlockingQueue, but disruptor is much better than ArrayBlockingQueue in terms of function and performance.
Advantages of Disruptor
- The most direct application scenario of Disruptor is naturally the application scenario of the “producer-consumer” model. Although we can also do this using JDK’s BlockingQueue, the performance of Disruptor is about 5~10 times higher than that of BlockingQueue:
- In other words, Disruptor can do everything BlockingQueue can do and do it better. At the same time, Disruptor can do more:
2.Code Project
Experimental goal: Use disruptor to send and receive messages
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>disruptor</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
application.yaml
server:
port: 8088
model
package com.et.disruptor.model;
import lombok.Data;
@Data
public class MessageModel {
private String message;
}
consumer
package com.et.disruptor.event;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000);
log.info("consume message start");
if (event != null) {
log.info("the message is:{}",event);
}
} catch (Exception e) {
log.info("consume message fail");
}
log.info("consume message ending");
}
}
producer
package com.et.disruptor.event;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author liuhaihua
* @version 1.0
* @ClassName HelloEventProducer
* @Description todo
* @date 2024年03月29日 13:26
*/
@Component
public class HelloEventProducer {
@Autowired
RingBuffer<MessageModel> ringBuffer;
public synchronized void sayHelloMq(String message){
EventTranslator<MessageModel> et = new EventTranslator<MessageModel>() {
@Override
public void translateTo(MessageModel messageModel, long l) {
messageModel.setMessage(message);
}
};
ringBuffer.publishEvent(et);
}
}
HelloEventFactory
package com.et.disruptor.event;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventFactory;
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
config
package com.et.disruptor.config;
import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
public class MqManager {
@Bean("ringBuffer")
public RingBuffer<MessageModel> messageModelRingBuffer() {
//define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivce
ExecutorService executor = Executors.newFixedThreadPool(2);
//define Event Factory
HelloEventFactory factory = new HelloEventFactory();
//ringbuffer byte size
int bufferSize = 1024 * 256;
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
//set consumer event
disruptor.handleEventsWith(new HelloEventHandler());
//start disruptor thread
disruptor.start();
//gain ringbuffer ring,to product event
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
controller
package com.et.disruptor.controller;
import com.et.disruptor.event.HelloEventProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
@Controller
public class HelloWorldController {
@Autowired
HelloEventProducer helloEventProducer;
@RequestMapping("/hello")
@ResponseBody
public Map<String, Object> showHelloWorld(){
Map<String, Object> map = new HashMap<>();
map.put("msg", "HelloWorld");
return map;
}
@RequestMapping("/send")
@ResponseBody
public String add(String message){
helloEventProducer.sayHelloMq(message);
return "success";
}
}
Main Class
package com.et.disruptor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
The above are just some key codes. For all codes, please see the code repository below.
Code repository
3.Test
- Start Spring Boot application
- Browser input:http://127.0.0.1:8088/send?message=hello%20world
- console output
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message start
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : the message is:MessageModel(message=hello world)
2024-03-29 14:00:54.831 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message ending