Spring boot intergratd netty implements websocket communication

HBLOG
5 min readMar 9, 2024

--

一、netty introduction

Netty is a java open source framework provided by JBOSS and is now an independent project on Github. Netty provides an asynchronous, event-driven network application framework and tools to quickly develop high-performance, high-reliability network server and client programs. In other words, Netty is a client-side and server-side programming framework based on NIO. Using Netty can ensure that you can quickly and easily develop a network application, such as a client-side or server-side application that implements a certain protocol. Netty is equivalent to simplifying and streamlining the programming and development process of network applications, such as: socket service development based on TCP and UDP. “Fast” and “simple” do not cause maintenance or performance problems. Netty is a project that absorbs the implementation experience of multiple protocols (including FTP, SMTP, HTTP and other binary text protocols) and is quite carefully designed. .In the end, Netty successfully found a way to ensure easy development while also ensuring the performance, stability and scalability of its applications.

二、Code project

Experimental goal: push messages to specified users

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netty</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.87.Final</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.1</version>
</dependency>
</dependencies>
</project>

Application.yaml

server:
port: 8088

netty server

package com.et.netty.server;
import com.et.netty.config.ProjectInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
/**
* @author dongliang7
* @projectName websocket-parent
* @ClassName NettyServer.java
* @description: TODO
* @createTime 2023/02/06日 16:41:00
*/
@Component
public class NettyServer {
static final Logger log = LoggerFactory.getLogger(NettyServer.class);
/**
*netty server port
*/
@Value("${webSocket.netty.port:8889}")
int port;
EventLoopGroup bossGroup;
EventLoopGroup workGroup;
@Autowired
ProjectInitializer nettyInitializer;
@PostConstruct
public void start() throws InterruptedException {
new Thread(() -> {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.localAddress(new InetSocketAddress(port));
bootstrap.childHandler(nettyInitializer);
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.bind().sync();
log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
}
}

ProjectInitializer

Initialize, set websocket handler

package com.et.netty.config;
import com.et.netty.handler.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author dongliang7
* @projectName websocket-parent
* @ClassName ProjectInitializer.java
* @description: pipeline configutation
* @createTime 2023/02/06 16:43:00
*/
@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {
/**
* webSocket protocol
*/
static final String WEBSOCKET_PROTOCOL = "WebSocket";
/**
* webSocket path
*/
@Value("${webSocket.netty.path:/webSocket}")
String webSocketPath;
@Autowired
WebSocketHandler webSocketHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
pipeline.addLast(webSocketHandler);
}
}

WebSocketHandler

package com.et.netty.handler;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.et.netty.config.NettyConfig;
import com.et.netty.server.NettyServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* @author dongliang7
* @projectName websocket-parent
* @ClassName WebSocketHandler.java
* @description: TODO
* @createTime 2023/02/06日 16:44:00
*/
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("new connect:[{}]", ctx.channel().id().asLongText());
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* read
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info("receive msg:{}", msg.text());
JSONObject jsonObject = JSONUtil.parseObj(msg.text());
String uid = jsonObject.getStr("uid");
NettyConfig.getChannelMap().put(uid, ctx.channel());
AttributeKey<String> key = AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);
ctx.channel().writeAndFlush(new TextWebSocketFrame("server receive msg"));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("user disonline :{}", ctx.channel().id().asLongText());
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("error:{}", cause.getMessage());
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}
/**
* delete the relation that use and channel
*/
private void removeUserId(ChannelHandlerContext ctx) {
AttributeKey<String> key = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(key).get();
NettyConfig.getChannelMap().remove(userId);
}
}

NettyConfig

package com.et.netty.config;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author dongliang7
* @projectName websocket-parent
* @ClassName NettyConfig.java
* @description: config
*/
public class NettyConfig {

private static volatile ChannelGroup channelGroup = null;

private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
/
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();

public static ChannelGroup getChannelGroup() {
if (null == channelGroup) {
synchronized (lock1) {
if (null == channelGroup) {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
return channelGroup;
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
if (null == channelMap) {
synchronized (lock2) {
if (null == channelMap) {
channelMap = new ConcurrentHashMap<>();
}
}
}
return channelMap;
}
public static Channel getChannel(String userId) {
if (null == channelMap) {
return getChannelMap().get(userId);
}
return channelMap.get(userId);
}
}

controller

package com.et.netty.controller;
import com.et.netty.service.PushMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author dongliang7
* @projectName
* @ClassName TestController.java
* @description: TODO
* @createTime 2023年02月06日 17:48:00
*/
@RestController
@RequestMapping("/push")
public class TestController {
@Autowired
PushMsgService pushMsgService;

@GetMapping("/{uid}")
public void pushOne(@PathVariable String uid) {
pushMsgService.pushMsgToOne(uid, "hello-------------------------");
}

@GetMapping("/pushAll")
public void pushAll() {
pushMsgService.pushMsgToAll("hello all-------------------------");
}
}

PushMsgService

package com.et.netty.service;
/**
* @author dongliang7
* @projectName websocket-parent
* @ClassName PushMsgService.java
*/
public interface PushMsgService {
/**
* push message to Specify user
*/
void pushMsgToOne(String userId, String msg);
/**
* push msg all user
*/
void pushMsgToAll(String msg);
}

PushMsgServiceImpl

package com.et.netty.service;
import com.et.netty.config.NettyConfig;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;

import java.util.Objects;

@Service
public class PushMsgServiceImpl implements PushMsgService {
@Override
public void pushMsgToOne(String userId, String msg) {
Channel channel = NettyConfig.getChannel(userId);
if (Objects.isNull(channel)) {
throw new RuntimeException("connection is not open");
}
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
@Override
public void pushMsgToAll(String msg) {
NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
}
}

The article is worth posting some key codes. The specific detailed codes can be found in the netty module of the code repository.

Code repository

三、Test

start the spring boot application

2024-03-08 11:21:32.975  INFO 10348 --- [       Thread-2] com.et.netty.server.NettyServer          : Server started and listen on:/0:0:0:0:0:0:0:0:8889

open postman and create websocket connection, then input ws://127.0.0.1:8889/webSocket,click the connect button

send message to netty message

{‘uid’:’sss’}

open your Browser,send messages to user(sss) http://127.0.0.1:8088/push/sss

四、reference

--

--