手机网站制作的公司推广官网

张小明 2026/3/2 21:34:35
手机网站制作的公司,推广官网,建设网站一般用什么字体,wordpress 文章状态前言:实时聊天需要让客户端与服务端有长链接如图使用netty可以保证服务器和各个客户端保证持续链接实现实时聊天如下我将展示一个示例代码,根据示例代码逐步讲解netty: 代码:Component public class NettyWebSocketStarter implements Runnable {private static final Logger l…前言:实时聊天需要让客户端与服务端有长链接如图使用netty可以保证服务器和各个客户端保证持续链接实现实时聊天如下我将展示一个示例代码,根据示例代码逐步讲解netty:代码:Component public class NettyWebSocketStarter implements Runnable { private static final Logger logger LoggerFactory.getLogger(NettyWebSocketStarter.class); Resource private AppConfig appConfig; Resource private HandlerWebSocket handlerWebSocket; /** * boss线程组用于处理连接 */ private EventLoopGroup bossGroup new NioEventLoopGroup(1); /** * work线程组用于处理消息 */ private EventLoopGroup workerGroup new NioEventLoopGroup(); /** * 资源关闭——在容器销毁时关闭 */ PreDestroy public void close() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } Override public void run() { try { //创建服务端启动助手 ServerBootstrap serverBootstrap new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(new ChannelInitializerChannel() { Override protected void initChannel(Channel channel) { ChannelPipeline pipeline channel.pipeline(); //设置几个重要的处理器 // 对http协议的支持使用http的编码器解码器 pipeline.addLast(new HttpServerCodec()); //聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest //保证接收的http请求的完整性 pipeline.addLast(new HttpObjectAggregator(64 * 1024)); //心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit // readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息 // writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息 //allIdleTime 所有类型的超时时间 pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new HandlerHeartBeat()); //将http协议升级为ws协议对websocket支持 pipeline.addLast(new WebSocketServerProtocolHandler(/ws, null, true, 64 * 1024, true, true, 10000L)); pipeline.addLast(handlerWebSocket); } }); //启动 ChannelFuture channelFuture serverBootstrap.bind(appConfig.getWsPort()).sync(); logger.info(Netty服务端启动成功,端口:{}, appConfig.getWsPort()); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }netty有三大内容:事件循环组,handler(处理器),pipeline(管道)事件循环组的含义就是无限循环去不断的处理io事件,不断循环就达到了实时的目的,因为他永不停歇,只要一发送信息就会处理,另一方就可以接受到信息boss事件循环组就像老板一样,只负责招投资,见别的老板所以他只负责链接woker就处理老板接受后的客户,客服的要求,要工人干什么就去干什么,解决其他的所有事情以下就是woker事件循环组中的情况简略图pipeline管道中有数个handler,每个handler均为双向联通的链表,pipeline就是信息传递的通道,handler就是具体的处理事情的处理器所以代码书写先绑定group(事件循环组)与channle(通道)然后创建pipeline在里面添加handlerhandler:HttpServerCodec() new HttpObjectAggregator(64 * 1024)new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)new HandlerHeartBeat()new WebSocketServerProtocolHandler(/ws, null, true, 64 * 1024, true, true, 10000L)以上除了handlerHeartBeat(自定义心跳处理) 其他均为常见handler 这里不做过多描写,看者可以ai查看用途 无需记忆只要有印象就可以心跳处理器:代码:public class HandlerHeartBeat extends ChannelDuplexHandler { private static final Logger logger LoggerFactory.getLogger(HandlerHeartBeat.class); Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e (IdleStateEvent) evt; if (e.state() IdleState.READER_IDLE) { AttributeString attribute ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString())); String userId attribute.get(); logger.info(用户{}没有发送心跳断开连接, userId); ctx.close(); } else if (e.state() IdleState.WRITER_IDLE) { ctx.writeAndFlush(heart); } } } }心跳检测全过程讲解:new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS) 当事件经过此处理器的时候,若客户端断开此处理器会向下传递IdleState.READER_IDLE事件 new HandlerHeartBeat()处理器检测事件,若断开就书写日志记录心跳断开,并且关闭通道上下文再次回归到事件循环组,事件循环组会不断循环解决io问题,如果事件循环组没有结束,idlestatehandler会一直接收到消息,一但idlestatehandler接受不到则代表事件循环组已经结束,意味着客户端与服务器断开,则需要一个处理器去关闭通道,所以handlerheartbeat应运而生HandlerWebSocket讲解:代码:ChannelHandler.Sharable Component(handlerWebSocket) public class HandlerWebSocket extends SimpleChannelInboundHandlerTextWebSocketFrame { private static final Logger logger LoggerFactory.getLogger(HandlerWebSocket.class); Resource private ChannelContextUtils channelContextUtils; Resource private RedisComponet redisComponet; /** * 当通道就绪后会调用此方法通常我们会在这里做一些初始化操作 * * param ctx * throws Exception */ Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Channel channel ctx.channel(); logger.info(有新的连接加入。。。); } /** * 当通道不再活跃时连接关闭会调用此方法我们可以在这里做一些清理工作 * * param ctx * throws Exception */ Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info(有连接已经断开。。。); channelContextUtils.removeContext(ctx.channel()); } /** * 读就绪事件 当有消息可读时会调用此方法我们可以在这里读取消息并处理。 * * param ctx * param textWebSocketFrame * throws Exception */ Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { //接收心跳 Channel channel ctx.channel(); AttributeString attribute channel.attr(AttributeKey.valueOf(channel.id().toString())); String userId attribute.get(); redisComponet.saveUserHeartBeat(userId); } //用于处理用户自定义的事件 当有用户事件触发时会调用此方法例如连接超时异常等。 Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete complete (WebSocketServerProtocolHandler.HandshakeComplete) evt; String url complete.requestUri(); String token getToken(url); if (token null) { ctx.channel().close(); return; } TokenUserInfoDto tokenUserInfoDto redisComponet.getTokenUserInfoDto(token); if (null tokenUserInfoDto) { ctx.channel().close(); return; } /** * 用户加入 */ channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel()); } } private String getToken(String url) { if (StringTools.isEmpty(url) || url.indexOf(?) -1) { return null; } String[] queryParams url.split(\\?); if (queryParams.length 2) { return url; } String[] params queryParams[1].split(); if (params.length ! 2) { return url; } return params[1]; } }此HandlerWebSocket部分方法与心跳处理有重复之处,可以重构一下,主要内容为userEventTriggered,此方法主要为验证token,token对则加入频道,如果错则向上报错下图为私聊简略图,用户1发送消息,服务器会将此消息发送给此频道中除了用户1外的所有人,群组消息同理ChannelContextUtil:代码:Component(channelContextUtils) public class ChannelContextUtils { private static final Logger logger LoggerFactory.getLogger(ChannelContextUtils.class); Resource private RedisComponet redisComponet; public static final ConcurrentMapString, Channel USER_CONTEXT_MAP new ConcurrentHashMap(); public static final ConcurrentMapString, ChannelGroup GROUP_CONTEXT_MAP new ConcurrentHashMap(); Resource private ChatSessionUserMapperChatSessionUser, ChatSessionUserQuery chatSessionUserMapper; Resource private ChatMessageMapperChatMessage, ChatMessageQuery chatMessageMapper; Resource private UserInfoMapperUserInfo, UserInfoQuery userInfoMapper; Resource private UserContactMapperUserContact, UserContactQuery userContactMapper; Resource private UserContactApplyMapperUserContactApply, UserContactApplyQuery userContactApplyMapper; /** * 加入通道 * * param userId * param channel */ public void addContext(String userId, Channel channel) { try { String channelId channel.id().toString(); AttributeKey attributeKey null; if (!AttributeKey.exists(channelId)) { attributeKey AttributeKey.newInstance(channel.id().toString()); } else { attributeKey AttributeKey.valueOf(channel.id().toString()); } channel.attr(attributeKey).set(userId); ListString contactList redisComponet.getUserContactList(userId); for (String groupId : contactList) { if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) { add2Group(groupId, channel); } } USER_CONTEXT_MAP.put(userId, channel); redisComponet.saveUserHeartBeat(userId); //更新用户最后连接时间 UserInfo updateInfo new UserInfo(); updateInfo.setLastLoginTime(new Date()); userInfoMapper.updateByUserId(updateInfo, userId); //给用户发送一些消息 //获取用户最后离线时间 UserInfo userInfo userInfoMapper.selectByUserId(userId); Long sourceLastOffTime userInfo.getLastOffTime(); //这里避免毫秒时间差所以减去1秒的时间 //如果时间太久只取最近三天的消息数 Long lastOffTime sourceLastOffTime; if (sourceLastOffTime ! null System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO sourceLastOffTime) { lastOffTime System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO; } /** * 1、查询会话信息 查询用户所有会话避免换设备会话不同步 */ ChatSessionUserQuery sessionUserQuery new ChatSessionUserQuery(); sessionUserQuery.setUserId(userId); sessionUserQuery.setOrderBy(last_receive_time desc); ListChatSessionUser chatSessionList chatSessionUserMapper.selectList(sessionUserQuery); WsInitData wsInitData new WsInitData(); wsInitData.setChatSessionList(chatSessionList); /** * 2、查询聊天消息 */ //查询用户的联系人 UserContactQuery contactQuery new UserContactQuery(); contactQuery.setContactType(UserContactTypeEnum.GROUP.getType()); contactQuery.setUserId(userId); ListUserContact groupContactList userContactMapper.selectList(contactQuery); ListString groupIdList groupContactList.stream().map(item - item.getContactId()).collect(Collectors.toList()); //将自己也加进去 groupIdList.add(userId); ChatMessageQuery messageQuery new ChatMessageQuery(); messageQuery.setContactIdList(groupIdList); messageQuery.setLastReceiveTime(lastOffTime); ListChatMessage chatMessageList chatMessageMapper.selectList(messageQuery); wsInitData.setChatMessageList(chatMessageList); /** * 3、查询好友申请 */ UserContactApplyQuery applyQuery new UserContactApplyQuery(); applyQuery.setReceiveUserId(userId); applyQuery.setLastApplyTimestamp(sourceLastOffTime); applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus()); Integer applyCount userContactApplyMapper.selectCount(applyQuery); wsInitData.setApplyCount(applyCount); //发送消息 MessageSendDto messageSendDto new MessageSendDto(); messageSendDto.setMessageType(MessageTypeEnum.INIT.getType()); messageSendDto.setContactId(userId); messageSendDto.setExtendData(wsInitData); sendMsg(messageSendDto, userId); } catch (Exception e) { logger.error(初始化链接失败, e); } } /** * 删除通道连接异常 * * param channel */ public void removeContext(Channel channel) { AttributeString attribute channel.attr(AttributeKey.valueOf(channel.id().toString())); String userId attribute.get(); if (!StringTools.isEmpty(userId)) { USER_CONTEXT_MAP.remove(userId); } redisComponet.removeUserHeartBeat(userId); //更新用户最后断线时间 UserInfo userInfo new UserInfo(); userInfo.setLastOffTime(System.currentTimeMillis()); userInfoMapper.updateByUserId(userInfo, userId); } public void closeContext(String userId) { if (StringTools.isEmpty(userId)) { return; } redisComponet.cleanUserTokenByUserId(userId); Channel channel USER_CONTEXT_MAP.get(userId); USER_CONTEXT_MAP.remove(userId); if (channel ! null) { channel.close(); } } public void sendMessage(MessageSendDto messageSendDto) { UserContactTypeEnum contactTypeEnum UserContactTypeEnum.getByPrefix(messageSendDto.getContactId()); switch (contactTypeEnum) { case USER: send2User(messageSendDto); break; case GROUP: sendMsg2Group(messageSendDto); } } /** * 发送消息给用户 */ private void send2User(MessageSendDto messageSendDto) { String contactId messageSendDto.getContactId(); sendMsg(messageSendDto, contactId); //强制下线 if (MessageTypeEnum.FORCE_OFF_LINE.getType().equals(messageSendDto.getMessageType())) { closeContext(contactId); } } /** * 发送消息到组 */ private void sendMsg2Group(MessageSendDto messageSendDto) { if (messageSendDto.getContactId() null) { return; } ChannelGroup group GROUP_CONTEXT_MAP.get(messageSendDto.getContactId()); if (group null) { return; } group.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(messageSendDto))); //移除群聊 MessageTypeEnum messageTypeEnum MessageTypeEnum.getByType(messageSendDto.getMessageType()); if (MessageTypeEnum.LEAVE_GROUP messageTypeEnum || MessageTypeEnum.REMOVE_GROUP messageTypeEnum) { String userId (String) messageSendDto.getExtendData(); redisComponet.removeUserContact(userId, messageSendDto.getContactId()); Channel channel USER_CONTEXT_MAP.get(userId); if (channel null) { return; } group.remove(channel); } if (MessageTypeEnum.DISSOLUTION_GROUP messageTypeEnum) { GROUP_CONTEXT_MAP.remove(messageSendDto.getContactId()); group.close(); } } private static void sendMsg(MessageSendDto messageSendDto, String reciveId) { if (reciveId null) { return; } Channel sendChannel USER_CONTEXT_MAP.get(reciveId); if (sendChannel null) { return; } //相当于客户而言联系人就是发送人所以这里转换一下再发送,好友打招呼信息发送给自己需要特殊处理 if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) { UserInfo userInfo (UserInfo) messageSendDto.getExtendData(); messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType()); messageSendDto.setContactId(userInfo.getUserId()); messageSendDto.setContactName(userInfo.getNickName()); messageSendDto.setExtendData(null); } else { messageSendDto.setContactId(messageSendDto.getSendUserId()); messageSendDto.setContactName(messageSendDto.getSendUserNickName()); } sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto))); } private void add2Group(String groupId, Channel context) { ChannelGroup group GROUP_CONTEXT_MAP.get(groupId); if (group null) { group new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); GROUP_CONTEXT_MAP.put(groupId, group); } if (context null) { return; } group.add(context); } public void addUser2Group(String userId, String groupId) { Channel channel USER_CONTEXT_MAP.get(userId); add2Group(groupId, channel); } }这一段代码并不难理解,主要实现频道的加入与退出,以及消息的分发实现消息分发的方法:有两种较为熟知的方法,第一种为redis实现,第二种为rabbitmq这里展示redis实现代码实现:Component(messageHandler) public class MessageHandlerT { private static final Logger logger LoggerFactory.getLogger(MessageHandler.class); private static final String MESSAGE_TOPIC message.topic; Resource private RedissonClient redissonClient; Resource private ChannelContextUtils channelContextUtils; PostConstruct public void lisMessage() { RTopic rTopic redissonClient.getTopic(MESSAGE_TOPIC); rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) - { logger.info(收到广播消息:{}, JSON.toJSONString(sendDto)); channelContextUtils.sendMessage(sendDto); }); } public void sendMessage(MessageSendDto sendDto) { RTopic rTopic redissonClient.getTopic(MESSAGE_TOPIC); rTopic.publish(sendDto); } }
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

嘉兴企业网站推广价格低文案

Windows 8网络连接与用户账户管理全攻略 1. 网络连接基础 在使用网络连接之前,了解一些基本术语非常重要,这样在遇到相关术语时就能清楚其含义。网络和共享中心是访问大多数网络配置设置的重要面板,而且大多数无线网络需要密码(也称为安全密钥)才能成功连接。 2. 连接隐…

张小明 2026/1/21 9:08:02 网站建设

阿里云手机版网站建设电商网页设计论文

Windows驱动管理终极指南:DriverStore Explorer高效解决方案 【免费下载链接】DriverStoreExplorer Driver Store Explorer [RAPR] 项目地址: https://gitcode.com/gh_mirrors/dr/DriverStoreExplorer Windows系统驱动管理一直是困扰用户的难题,D…

张小明 2026/1/21 9:07:31 网站建设

国外室内设计网站大全网站工厂弄个网站做外贸如何

Rclone终极配置指南:3分钟掌握云存储同步神器 【免费下载链接】rclone 项目地址: https://gitcode.com/gh_mirrors/rcl/rclone 还在为不同云存储平台之间的文件同步而烦恼吗?面对Google Drive、Dropbox、OneDrive等众多云服务,手动上…

张小明 2026/3/1 0:16:19 网站建设

公司内部网站页面设计网站系统后台

概述随着人工智能技术的快速发展,AI 正在深刻改变数据库管理与操作的方式。从自动化查询生成到性能调优、数据质量监控,再到智能报表分析,AI 已成为现代数据库系统中不可或缺的“智能助手”。本文系统梳理了 AI 在数据库操作中的 8 大核心应用…

张小明 2026/1/21 9:06:29 网站建设

seo企业站收录东莞市建筑业协会

FaceFusion、Stable Diffusion 与 DeepFaceLive:谁才是人脸生成的终极答案?在虚拟主播一夜爆红、AI换脸视频席卷社交平台的今天,我们正站在一个人脸数字化的奇点上。无论是电影工业中悄然替换演员面孔,还是直播镜头里实时变身“数…

张小明 2026/1/21 9:05:58 网站建设

如何做网站流程图网站开发与管理的专业描述

第一章:Open-AutoGLM与共享单车预约平台的融合背景随着城市化进程加快,短途出行需求激增,共享单车作为绿色出行的重要载体,面临调度效率低、用户预约体验差等挑战。传统调度系统依赖历史数据和固定规则,难以应对动态变…

张小明 2026/1/21 9:05:27 网站建设