使用netty实现一个类似于微信的聊天功能
生活随笔
收集整理的這篇文章主要介紹了
使用netty实现一个类似于微信的聊天功能
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.maven依賴
? ? ? ?<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.25.Final</version></dependency>2.netty代碼
部分業務邏輯代碼已省略。
后端框架為SpringBoot+MyBatis+Spring MVC
ChatHandler.java
/*** 處理消息的handler* TextWebSocketFrame: 在netty中,是用于為websocket專門處理文本的對象,frame是消息的載體*/ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { ?/*** 用來保存所有的客戶端連接*/private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); ?/*** 當Channel中有新的事件消息會自動調用*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {// 當接收到數據后會自動調用 ?// 獲取客戶端發送過來的文本消息String text = msg.text();System.out.println("接收到消息數據為:" + text); ?Message message = JSON.parseObject(text, Message.class); ?// 通過SpringUtil工具類獲取Spring上下文容器ChatRecordService chatRecordService = SpringUtil.getBean(ChatRecordService.class); ?switch (message.getType()) {// 處理客戶端連接的消息case 0:// 建立用戶與通道的關聯String userid = message.getChatRecord().getUserid();UserChannelMap.put(userid, ctx.channel());System.out.println("建立用戶:" + userid + "與通道" + ctx.channel().id() + "的關聯");UserChannelMap.print();break;// 處理客戶端發送好友消息case 1:System.out.println("接收到用戶消息");// 將聊天消息保存到數據庫TbChatRecord chatRecord = message.getChatRecord();chatRecordService.insert(chatRecord); ?// 如果發送消息好友在線,可以直接將消息發送給好友Channel channel = UserChannelMap.get(chatRecord.getFriendid());if (channel != null) {channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));} else {// 如果不在線,暫時不發送System.out.println("用戶" + chatRecord.getFriendid() + "不在線");}break;// 處理客戶端的簽收消息case 2:// 將消息記錄設置為已讀chatRecordService.updateStatusHasRead(message.getChatRecord().getId());break;case 3:// 接收心跳消息System.out.println("接收到心跳消息:" + JSON.toJSONString(message));break;default:} ?} ?/*** 當有新的客戶端連接服務器之后,會自動調用這個方法*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {// 將新的通道加入到clientsclients.add(ctx.channel());} ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());ctx.channel().close();} ?@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("關閉通道");UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());UserChannelMap.print();} }HearBeatHandler.java
/** * 有時候Netty并不能在到客戶端關閉時,自動關閉對應的通道資源。所以需要一個心跳機制,去檢測每個通道是否空閑。 * ? 如果空閑超過一定時間,就需要將對應客戶端的通道資源關閉。客戶端需要每隔一段時間發送一條消息,用來保持心跳。 * ? 該代碼中約定message.getType==3為心跳消息,不需要處理 */ public class HearBeatHandler extends ChannelInboundHandlerAdapter { ?@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt; ?if (idleStateEvent.state() == IdleState.READER_IDLE) {System.out.println("讀空閑事件觸發...");} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {System.out.println("寫空閑事件觸發...");} else if (idleStateEvent.state() == IdleState.ALL_IDLE) {System.out.println("---------------");System.out.println("讀寫空閑事件觸發");System.out.println("關閉通道資源");ctx.channel().close();}}} }Message.java
該實體為和前端約定好的格式,通過不同的消息類型,達到不同的功能。
public class Message {private Integer type; // 消息類型private TbChatRecord chatRecord; ? ?// 聊天消息private Object ext; ?// 擴展消息字段 ?public Integer getType() {return type;} ?public void setType(Integer type) {this.type = type;} ?public TbChatRecord getChatRecord() {return chatRecord;} ?public void setChatRecord(TbChatRecord chatRecord) {this.chatRecord = chatRecord;} ?public Object getExt() {return ext;} ?public void setExt(Object ext) {this.ext = ext;} } ?TbChatRecord.java
public class TbChatRecord {private String id; ?private String userid; ?private String friendid; ?private Integer hasRead; ?private Date createtime; ?private Integer hasDelete; ?private String message; }NettyListener.java
/*** 服務啟動監聽器*/ @Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { ?@Autowiredprivate WebSocketServer websocketServer; ?@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext().getParent() == null) {try {websocketServer.start();} catch (Exception e) {e.printStackTrace();}}} }UserChannelMap.java
/*** 建立用戶ID與通道的關聯*/ public class UserChannelMap {// 用戶保存用戶id與通道的Map對象private static Map<String, Channel> userChannelMap; ?static {userChannelMap = new HashMap<String, Channel>();} ?/*** 添加用戶id與channel的關聯* @param userid* @param channel*/public static void put(String userid, Channel channel) {userChannelMap.put(userid, channel);} ?/*** 根據用戶id移除用戶id與channel的關聯* @param userid*/public static void remove(String userid) {userChannelMap.remove(userid);} ?/*** 根據通道id移除用戶與channel的關聯* @param channelId 通道的id*/public static void removeByChannelId(String channelId) {if(!StringUtils.isNotBlank(channelId)) {return;} ?for (String s : userChannelMap.keySet()) {Channel channel = userChannelMap.get(s);if(channelId.equals(channel.id().asLongText())) {System.out.println("客戶端連接斷開,取消用戶" + s + "與通道" + channelId + "的關聯");userChannelMap.remove(s);break;}}} ? ?// 打印所有的用戶與通道的關聯數據public static void print() {for (String s : userChannelMap.keySet()) {System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id());}} ?/*** 根據好友id獲取對應的通道* @param friendid 好友id* @return Netty通道*/public static Channel get(String friendid) {return userChannelMap.get(friendid);} }WebsocketInitializer.java
/*** 用于在某個Channel注冊到EventLoop后,對這個Channel執行一些初始化操作* ChannelInitializer雖然會在一開始會被注冊到Channel相關的pipeline里,* 但是在初始化完成之后,ChannelInitializer會將自己從pipeline中移除,不會影響后續的操作*/ public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); ?// ------------------// 用于支持Http協議// ------------------ ?// websocket基于http協議,需要有http的編解碼器pipeline.addLast(new HttpServerCodec());// 對寫大數據流的支持pipeline.addLast(new ChunkedWriteHandler());// 添加對HTTP請求和響應的聚合器:只要使用Netty進行Http編程都需要使用// 對HttpMessage進行聚合,聚合成FullHttpRequest或者FullHttpResponse// 在netty編程中都會使用到Handlerpipeline.addLast(new HttpObjectAggregator(1024 * 64)); ?pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); ?// 添加Netty空閑超時檢查的支持// 1. 讀空閑超時(超過一定的時間會發送對應的事件消息)// 2. 寫空閑超時// 3. 讀寫空閑超時pipeline.addLast(new IdleStateHandler(4, 8, 12)); ?pipeline.addLast(new HearBeatHandler());// 添加自定義的handlerpipeline.addLast(new ChatHandler()); ?} }WebSocketServer.java
/*** netty的服務器*/ @Component public class WebSocketServer { ?private EventLoopGroup bossGroup; ? ? ? // 主線程池private EventLoopGroup workerGroup; ? ? // 工作線程池private ServerBootstrap server; ? ? ? ? // 服務器private ChannelFuture future; ? ? ? ? ? // 回調 ?public void start() {future = server.bind(9001);System.out.println("netty server - 啟動成功");} ?public WebSocketServer() {bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup(); ?server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new WebsocketInitializer());} }SpringUtil.java
/*** @Description: 提供手動獲取被spring管理的bean對象*/ @Component public class SpringUtil implements ApplicationContextAware {private static ApplicationContext applicationContext; ?@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (SpringUtil.applicationContext == null) {SpringUtil.applicationContext = applicationContext;}} ?// 獲取applicationContextpublic static ApplicationContext getApplicationContext() {return applicationContext;} ?// 通過name獲取 Bean.public static Object getBean(String name) {return getApplicationContext().getBean(name);} ?// 通過class獲取Bean.public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);} ?// 通過name,以及Clazz返回指定的Beanpublic static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);} ? }?
總結
以上是生活随笔為你收集整理的使用netty实现一个类似于微信的聊天功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用netty搭建一个简单的聊天室
- 下一篇: Spring自定义注解+redis实现接