commit 2138f97bd70eb7a6e0bf13d1d80a3e5374ef2b2c Author: chuzhongzai Date: Tue Jan 30 18:20:10 2024 +0800 first commit diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..bee7927 --- /dev/null +++ b/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + + com.demo + SecertChat + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + + + + + io.netty + netty-all + 4.1.101.Final + + + + org.projectlombok + lombok + 1.18.30 + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.1 + + + + + + + org.graalvm.buildtools + native-maven-plugin + 0.9.28 + + com.demo.Main + SecretChat + true + + --gc=G1 + -H:+ReportExceptionStackTraces + + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 21 + 21 + --enable-preview + + + + + + \ No newline at end of file diff --git a/src/main/java/com/demo/Main.java b/src/main/java/com/demo/Main.java new file mode 100644 index 0000000..3e68406 --- /dev/null +++ b/src/main/java/com/demo/Main.java @@ -0,0 +1,7 @@ +package com.demo; + +public class Main { + public static void main(String[] args) { + SecretChat.serving(); + } +} \ No newline at end of file diff --git a/src/main/java/com/demo/Message.java b/src/main/java/com/demo/Message.java new file mode 100644 index 0000000..247f888 --- /dev/null +++ b/src/main/java/com/demo/Message.java @@ -0,0 +1,29 @@ +package com.demo; + +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +import static java.lang.StringTemplate.STR; + +public class Message { + //表示前端与后端建立连接并发送聊天id + public static final int INIT = 1; + + //表示前端发送的聊天id无法在等待队列找到 + public static final int WAIT = 11; + + //表示前端发送的聊天id可以在等待队列找到 + public static final int JOIN = 12; + + //表示前端取消等待 + public static final int CANCEL = 13; + + //表示对方断开连接 + public static final int DISCONNECT = 20; + + //表示转发该消息 + public static final int CONTENT = 30; + + public static TextWebSocketFrame pairDisconnected(){ + return new TextWebSocketFrame(STR."{\"type\":\{Message.DISCONNECT}}"); + } +} diff --git a/src/main/java/com/demo/SecretChat.java b/src/main/java/com/demo/SecretChat.java new file mode 100644 index 0000000..fa9ff9a --- /dev/null +++ b/src/main/java/com/demo/SecretChat.java @@ -0,0 +1,140 @@ +package com.demo; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; + +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class SecretChat { + + + //等待队列 + static TwoWayMap pairCodeWithChannelId; + + //已连接 + static HashMap channelId2Channel; + + //聊天配对 + static TwoWayMap channelIdWithChannelId; + + static ScheduledExecutorService scheduler; + + static ObjectMapper objectMapper = new ObjectMapper(); + + public static void serving(){ + pairCodeWithChannelId = new TwoWayMap<>(); + channelId2Channel = new HashMap<>(); + channelIdWithChannelId = new TwoWayMap<>(); + scheduler = Executors.newScheduledThreadPool(1); + + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new HttpServerCodec()); // HTTP 协议解析,用于握手阶段 + pipeline.addLast(new HttpObjectAggregator(65536)); // HTTP 协议解析,用于握手阶段 + pipeline.addLast(new WebSocketServerCompressionHandler()); // WebSocket 数据压缩扩展 + pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true)); // WebSocket 握手、控制帧处理 + pipeline.addLast(new MyWebSocketServerHandler()); + } + }); + ChannelFuture sync = b.bind(7777).sync(); + sync.channel().closeFuture().sync(); + }catch (Exception e){ + e.printStackTrace(); + }finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + static class MyWebSocketServerHandler extends SimpleChannelInboundHandler { + + @Override + public void channelActive(ChannelHandlerContext ctx){ + channelId2Channel.put(ctx.channel().id().asShortText(), ctx.channel()); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame){ + if (frame instanceof TextWebSocketFrame textWebSocketFrame) { // 此处仅处理 Text Frame + JsonNode node; + try { + node = objectMapper.readTree(textWebSocketFrame.text()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + //说明不是合法消息 + if(!node.has("type")) + return; + + switch (node.get("type").asInt()){ + case Message.INIT -> { + String pairCode = node.get("pairCode").asText(); + if(pairCodeWithChannelId.containsKey(pairCode)){ //如果等待队列存在该id,则建立连接 + String preChannelId = pairCodeWithChannelId.remove(pairCode); + String channelId = ctx.channel().id().asShortText(); + + channelIdWithChannelId.put(preChannelId, channelId); + + ObjectNode joinNode = objectMapper.createObjectNode(); + joinNode.put("type", Message.JOIN); + TextWebSocketFrame joinMessage = new TextWebSocketFrame(joinNode.toString()); + channelId2Channel.get(preChannelId).writeAndFlush(joinMessage.copy()); + ctx.channel().writeAndFlush(joinMessage); + } else { //否则加入等待队列 + pairCodeWithChannelId.put(pairCode, ctx.channel().id().asShortText()); + scheduler.schedule(() -> { + pairCodeWithChannelId.remove(pairCode); + }, 60, TimeUnit.SECONDS); + + ObjectNode joinNode = objectMapper.createObjectNode(); + joinNode.put("type", Message.WAIT); + TextWebSocketFrame joinMessage = new TextWebSocketFrame(joinNode.toString()); + ctx.channel().writeAndFlush(joinMessage); + } + } + case Message.CONTENT -> channelId2Channel.get(channelIdWithChannelId.get(ctx.channel().id().asShortText())).writeAndFlush(new TextWebSocketFrame(node.get("content").asText())); + case Message.CANCEL -> pairCodeWithChannelId.remove(node.get("pairCode").asText()); + } + } + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx){ + channelId2Channel.remove(ctx.channel().id().asShortText()); + //建立连接的一般分为两种:1.正在等待 2.已经建立聊天 + if(pairCodeWithChannelId.containsKey(ctx.channel().id().asShortText())){ + pairCodeWithChannelId.remove(ctx.channel().id().asShortText()); + } else { + String channelId = channelIdWithChannelId.remove(ctx.channel().id().asShortText()); + if(channelId != null && channelId2Channel.get(channelId) != null) + channelId2Channel.remove(channelId).writeAndFlush(Message.pairDisconnected()); + } + } + } +} + diff --git a/src/main/java/com/demo/TwoWayMap.java b/src/main/java/com/demo/TwoWayMap.java new file mode 100644 index 0000000..20406d7 --- /dev/null +++ b/src/main/java/com/demo/TwoWayMap.java @@ -0,0 +1,18 @@ +package com.demo; + +import java.util.HashMap; + +public class TwoWayMap extends HashMap { + @Override + public String put(String key, String value) { + super.put(key, value); + return super.put(value, key); + } + + @Override + public String remove(Object key) { + String result = super.remove(key); + super.remove(result); + return result; + } +}