1.chronicle queue introduction
Chronicle Queue uses a memory mapped file to persist each message. This enables us to share messages between processes. It stores data directly to off-heap memory, thus making it free of GC overhead. It is designed to provide a low-latency messaging framework for high-performance applications. Use the open source Chronicle Queue to create huge durable queues while maintaining predictable and consistently low latency.
Chronicle Queue has three conceptual features.
- Excerpt — is a data container
- Appender — appender is used for writing data
- Trailer — is used for sequentially reading data
We will use the Chronicle interface to reserve this memory for read and write operations.
create queue
File queueDir = Files.createTempDirectory("chronicle-queue").toFile();
Chronicle chronicle = ChronicleQueueBuilder.indexed(queueDir).build();
We will need a base directory where the queue will persist records in memory mapped files.
ChronicleQueueBuilder class provides different types of queues. In this case, we used IndexedChronicleQueue, which uses a sequential index to maintain the memory offset of the records in the queue.
Write something to the queue.
In order to write items to the queue, we need to create an object of the ExcerptAppender class using a Chronicle instance. Below is sample code for writing a message to the queue.
Below is sample code for writing a message to the queue.
ExcerptAppender appender = chronicle.createAppender();
appender.startExcerpt();
String stringVal = "Hello World";
int intVal = 101;
long longVal = System.currentTimeMillis();
double doubleVal = 90.00192091d;
appender.writeUTF(stringValue);
appender.writeInt(intValue);
appender.writeLong(longValue);
appender.writeDouble(doubleValue);
appender.finish();
After creating the appender, we will start the appender using the startExcerpt method. It starts an Excerpt with a default message capacity of 128K. We can use an overloaded version of startExcerpt to provide a custom capacity.
Once started, we can write any literal or object value to the queue using various writing methods provided by the library.
Finally, when we’re done writing, we’ll complete the excerpt and save the data to a queue for later saving to disc.
Read information from the queue
Reading values from the queue can be easily done using an ExcerptTrailer instance.
It is like the iterator we use in Java to traverse a collection.
Let’s read the value from the queue
ExcerptTailer tailer = chronicle.createTailer();
while (tailer.nextIndex()) {
tailer.readUTF();
tailer.readInt();
tailer.readLong();
tailer.readDouble();
}
tailer.finish();
After creating the trailer, we use the nextIndex method to check if there is a new excerpt to read. Once ExcerptTailer has a new Excerpt to read, we can use a series of read methods to read information about literal and object type values.
2.Code Project
The goal is to create a queue that is durable, concurrent, low-latency, accessible from multiple processes, and can hold billions of objects
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>Chronicle-Queue</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.25ea12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
</dependencies>
</project>
entity
package com.et.chronicle.queue.entity;
import net.openhft.chronicle.wire.Marshallable;
public class Person implements Marshallable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Person() {
super();
}
@Override
public String toString() {
return Marshallable.$toString(this);
}
}
Code repository
3.Test
send and receive text messages
package com.et.chronicle.queue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
class TestQueueMain {
ChronicleQueue queue;
@BeforeEach
void setUp() throws Exception {
String basePath = OS.getTarget() + "/Queue1";
queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
}
@AfterEach
void tearDown() throws Exception {
queue.close();
}
@Test
void testWtite() {
ExcerptAppender appender = queue.acquireAppender();
try {
for (int i = 0; i < 50000; i++) {
appender.writeText("Hello World(hello world)!--" + i);
}
} finally {
appender.close();
}
}
@Test
void testRead() {
ExcerptTailer tailer = queue.createTailer("reader1");
try {
String readText = null;
while ((readText = tailer.readText()) != null) {
System.out.println("read: " + readText);
}
} finally {
tailer.close();
}
}
}
send and receive object messages
package com.et.chronicle.queue;
import com.alibaba.fastjson.JSONObject;
import com.et.chronicle.queue.entity.Person;
import com.fasterxml.jackson.databind.util.JSONPObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
class TestQueueObject {
ChronicleQueue queue;
@BeforeEach
void setUp() throws Exception {
String basePath = OS.getTarget() + "/QueueDocument";
queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
}
@AfterEach
void tearDown() throws Exception {
queue.close();
}
@Test
void testMarshallable() {
ExcerptAppender appender = queue.acquireAppender();
try {
for (int i = 0; i < 5; i++) {
Person person = new Person();
person.setName("Rob");
person.setAge(40 + i);
appender.writeDocument(person);
}
} finally {
appender.close();
}
ExcerptTailer tailer = queue.createTailer("reader1");
try {
Person person2 = new Person();
while (tailer.readDocument(person2)) {
System.out.println(JSONObject.toJSON(person2));
}
} finally {
appender.close();
}
}
}