600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Java 使用Websocket 与MQ消息队列实现即时消息

Java 使用Websocket 与MQ消息队列实现即时消息

时间:2020-12-25 10:51:34

相关推荐

Java 使用Websocket 与MQ消息队列实现即时消息

Java 使用Websocket 与MQ消息队列实现即时消息

项目需求:根据不同用户账号产生的数据需要即时展示到首页大屏中进行展示,实现方式1:前端短时间内轮训调用后端接口,后端返回最新相关数据进行展示2:使用websocket即时通信,一产生新数据,就立即发送。数据产生有MQ进行推送,保证实时性第一种方式舍弃,频繁请求接口,大部分请求都无效请求,成本过大

实现思路:1:建立websocket连接,缓存连接用户信息,使用session,保证即时同账号不同登录页也能接收2:使用MQ,监听MQ产生的推送消息topic3: MQ监听消息处理类接收消息,根据消息处理业务情况,并根据数据筛选出需要推送到所属用户4:保持在线用户连接,定时任务每30秒发送缓存内还保持连接的用户心跳数据

技术选型使用:netty-websocket详细说明查看:/Yeauty/netty-websocket-spring-boot-starterwebsocket在线测试工具,可在线测试:/tool/chattest

前言

在实际开发使用过程中,产线环境都是使用HTTPS 以及配合 Nginx进行使用,但是在测试环境下,自己则是通过ws 的方式进行连接测试,即:ws://IP地址 + 端口号/websocket所以关于HTTPS下使用 wss 协议的问题,以及配合 Nginx 使用域名方式建立连接不使用 IP地址 + 端口号 连接 WebSocket,因为这种方式不够优雅

ws 和 wss 又是什么鬼?

Websocket使用 ws 或 wss 的统一资源标志符,类似于 HTTP 或 HTTPS其中 wss 表示在 TLS 之上的 Websocket ,相当于 HTTPS 了如:ws:///Websocketwss:///Websocket默认情况下,Websocket 的 ws 协议使用 80 端口;运行在TLS之上时,wss 协议默认使用 443 端口。其实说白了,wss 就是 ws 基于 SSL 的安全传输,与 HTTPS 一样样的道理。如果你的网站是 HTTPS 协议的,那你就不能使用 ws:// 了浏览器会 block 掉连接,和 HTTPS 下不允许 HTTP 请求一样,如下图:

Mixed Content: The page at '/' was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://x.x.x.x:xxxx/'. This request has been blocked; this endpoint must be available over WSS.这种情况,我们就需要使用 wss:\\ 安全协议了如果把ws的方式来去使用wss的时候

VM512:35 WebSocket connection to 'wss://IP地址:端口号/websocket' failed: Error in connection establishment: net::ERR_SSL_PROTOCOL_ERROR

很明显 SSL 协议错误,说明就是证书问题了。这时候我们一直拿的是 IP地址 + 端口号 这种方式连接 WebSocket 的这没有证书存在,生产环境不可能用 IP地址 + 端口号 这种方式连接 WebSocket 的要用域名方式连接 WebSocket 。

Nginx 配置域名支持 WSS

Nginx配置 HTTPS 域名位置加入配置:

location /websocket {proxy_pass http://backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}

接着拿域名再次连接试一下,不出意外会看 101 状态码:

这样就完成了在 HTTPPS 下以域名方式连接 WebSocket 接下来直接上代码

Maven导包

<dependency><groupId>org.yeauty</groupId><artifactId>netty-websocket-spring-boot-starter</artifactId><version>0.12.0</version></dependency>

JAVA代码

websocket处理类

package com.biz.controller.home;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.TypeReference;import com.redis.provider.impl.StringRedisProvider;import com.core.constant.SecurityConstant;import com.security.def.BoyunLoginUser;import com.security.def.BoyunUserDTO;import com.biz.def.UserInfoWebSocket;import com.biz.def.WebSocketServerDto;import ty.handler.timeout.IdleStateEvent;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import mons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.yeauty.annotation.*;import org.yeauty.pojo.Session;import java.io.IOException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;/*** webSocket** @author 夕四* @date -01-24 16:10**/@Slf4j@ServerEndpoint(path = "/ws/{sid}", port = "9011")public class MyWebSocket {/*** 当前在线连接数*/public static final AtomicInteger ONLINE_NUM = new AtomicInteger(0);/*** concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/private static ConcurrentHashMap<String, WebSocketServerDto> webSocketMap = new ConcurrentHashMap<>();@Autowiredprivate StringRedisProvider stringRedisProvider;@BeforeHandshakepublic void handshake(Session session) {session.setSubprotocols("stomp");}@OnOpenpublic void onOpen(Session session, @PathVariable String sid) {// 连接数 +1ONLINE_NUM.incrementAndGet();log.info("{}连接成功,当前在线数量:{}", sid, ONLINE_NUM.get());if (StringUtils.isBlank(sid)) {session.close();return;}log.info(">>>>>>>>>>>>>>>>> sid:{}", sid);session.setAttribute(SecurityConstant.TOKEN, sid);log.info(">>>>>>>>>>>>>>>>> sid2:{}", sid);// redis判断sid的用户值String value = stringRedisProvider.get(SecurityConstant.PROJECT_PREFIX + sid);log.info(">>>>>>>>>>>>>>>>> value:{}", value);if (StringUtils.isBlank(value)) {log.debug("{} 未登录", sid);session.close();return;}BoyunLoginUser<BoyunUserDTO> tokenUser = JSON.parseObject(value, new TypeReference<BoyunLoginUser<BoyunUserDTO>>() {});log.info(">>>>>>>>>>>>>>>>> tokenUser:{}", tokenUser.toString());// 绑定自定义属性Long userId = tokenUser.getUser().getUserId();session.setAttribute(SecurityConstant.INNER_USER_ID, userId);WebSocketServerDto webSocketServerDto = webSocketMap.get(sid);if (webSocketServerDto != null) {log.info("关闭");// 如果已存在,先关闭以前的连接,可能会出现覆盖后再添加webSocketServerDto.getSession().close();}webSocketServerDto = new WebSocketServerDto();webSocketServerDto.setSession(session);webSocketMap.put(sid, webSocketServerDto);UserInfoWebSocket.setSessionIdMap(userId, sid);}@OnClosepublic void onClose(Session session) throws IOException {// 连接数 -1ONLINE_NUM.decrementAndGet();String sid = session.getAttribute(SecurityConstant.TOKEN);Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);log.info("{} 连接关闭,在线数量:{}", sid, ONLINE_NUM.get());webSocketMap.remove(sid);UserInfoWebSocket.delSessionIdMap(userId, sid);}@OnErrorpublic void onError(Session session, Throwable throwable) {log.error("发生错误的连接:{},userId:{}", session.getAttribute(SecurityConstant.TOKEN), session.getAttribute(SecurityConstant.INNER_USER_ID));}@SneakyThrows@OnMessagepublic void onMessage(Session session, String message) {if (!"123456789".equals(message)) {// session.sendText(message);// log.info("发送消息:{}", message);}}@OnBinarypublic void onBinary(Session session, byte[] bytes) {// 不打印心跳数据if (!"123456789".equals(new String(bytes))) {log.info("收到客户端:{} 的消息:{}", session.getAttribute(SecurityConstant.TOKEN), new String(bytes));// session.sendText("123456789");}}@OnEventpublic void onEvent(Session session, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;switch (idleStateEvent.state()) {case READER_IDLE:log.info("读数据闲置");break;case WRITER_IDLE:log.info("写数据闲置");break;case ALL_IDLE:log.info("读、写数据闲置");break;default:break;}}}/*** 发送自定义消息*/public static void sendInfo(String message, String sid) {// 任意一个参数是空,返回falseif (StringUtils.isAnyBlank(message, sid)) {log.debug("参数错误,sid:{},message:{}", sid, message);}log.debug("推送消息到窗口{},推送内容: {}", sid, message);WebSocketServerDto webSocketServer = webSocketMap.get(sid);if (webSocketServer != null) {Session session = webSocketServer.getSession();if (session == null) {log.error("{} 不存在session", sid);return;}// 不活跃了if (!session.isActive()) {// 移除Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);webSocketMap.remove(sid);UserInfoWebSocket.delSessionIdMap(userId, sid);}session.sendText(message);}}}

WebSocket 需要推送的用户信息缓存

package com.biz.def;import lombok.extern.slf4j.Slf4j;import java.util.*;import java.util.concurrent.locks.ReentrantReadWriteLock;/*** WebSocket信息缓存** @author 夕四* @date -02-23*/@Slf4jpublic class UserInfoWebSocket {/*** key为锁id,value为锁id对应的用户列表*/private static Map<Long, Set<String>> cachedMap = new HashMap<>();private static ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();/*** 根据用户id获取sessionId** @param userId* @return*/public static Set<String> getSessionIdSet(Long userId) {rwlock.readLock().lock();try {return cachedMap.get(userId);} finally {rwlock.readLock().unlock();}}/*** 根据用户id集合获取sessionId集合** @param userIdList* @return*/public static Set<String> getSessionIdSetByUserIdList(List<Long> userIdList) {rwlock.readLock().lock();Set<String> result = new HashSet<>();try {log.info("<<<<<<<<<<<<<<<<<<<< cachedMap:{}",cachedMap);for (Long userId : userIdList) {Set<String> sidList = cachedMap.get(userId);if (sidList != null && !sidList.isEmpty()) {result.addAll(sidList);}}} finally {rwlock.readLock().unlock();}return result;}/*** 获取sessionId集合** @return*/public static Set<String> getSessionIdSet() {rwlock.readLock().lock();Set<String> result = new HashSet<>();try {for (Map.Entry<Long, Set<String>> longSetEntry : cachedMap.entrySet()) {Set<String> sidList = longSetEntry.getValue();result.addAll(sidList);}} finally {rwlock.readLock().unlock();}return result;}/*** 添加用户id对应的sessionId** @param userId* @param sid*/public static void setSessionIdMap(Long userId, String sid) {rwlock.writeLock().lock();try {Set<String> list = cachedMap.get(userId);if (list == null) {list = new HashSet<>();}list.add(sid);cachedMap.put(userId, list);} finally {rwlock.writeLock().unlock();}}/*** 删除某个用户的sessionId** @param userId* @param sid*/public static void delSessionIdMap(Long userId, String sid) {rwlock.writeLock().lock();try {Set<String> set = cachedMap.get(userId);if (set == null) {return;}set.remove(sid);if (set.isEmpty()) {cachedMap.remove(userId);} else {cachedMap.put(userId, set);}} finally {rwlock.writeLock().unlock();}}/*** 删除用户id** @param userId*/public static void delUserId(Long userId) {rwlock.writeLock().lock();try {cachedMap.remove(userId);} finally {rwlock.writeLock().unlock();}}}

webSocket传输对象

package com.biz.def;import org.yeauty.pojo.Session;import java.util.Date;import java.util.Objects;/*** webSocket传输对象*/public class WebSocketServerDto {/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** 接收sid*/private String sid;/*** 心跳时间*/private Date heartTime;public WebSocketServerDto() {}public WebSocketServerDto(Session session, String sid, Date heartTime) {this.session = session;this.sid = sid;this.heartTime = heartTime;}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}public String getSid() {return sid;}public void setSid(String sid) {this.sid = sid;}public Date getHeartTime() {return heartTime;}public void setHeartTime(Date heartTime) {this.heartTime = heartTime;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WebSocketServerDto that = (WebSocketServerDto) o;return Objects.equals(session, that.session) &&Objects.equals(sid, that.sid) &&Objects.equals(heartTime, that.heartTime);}@Overridepublic int hashCode() {return Objects.hash(session, sid, heartTime);}@Overridepublic String toString() {return "WebSocketServerDto{" +"session=" + session +", sid='" + sid + '\'' +", heartTime=" + heartTime +'}';}}

websocket的推送消息体

package com.bsj.studentcard.upms.pc.biz.def;import lombok.Data;import lombok.NoArgsConstructor;/*** websocket推送消息** @author 夕四* @date -02-23 12:55**/@Data@NoArgsConstructorpublic class WsMsgDataVO<T> {/*** 推送数据*/private T data;/*** 标记标签*/private String tag;public WsMsgDataVO(T data, String tag) {this.data = data;this.tag = tag;}}

MQ的topic监听

package com.bsj.studentcard.upms.pc.biz.config.mq;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;/*** 消息接口** @author 夕四* @date -05-14 16:25**/public interface MySource {/*** 报警回调*/String ALARM_SOS = "alarmSos";/*** 报警回调发送** @return*/@Input(ALARM_SOS)SubscribableChannel alarmSos();}

MQ监听消息处理websock发送

package com.bsj.studentcard.upms.pc.biz.config.mq;import cn.hutool.core.bean.BeanUtil;import cn.hutool.core.collection.CollUtil;import cn.hutool.core.date.DateUtil;import cn.hutool.core.util.StrUtil;import com.alibaba.fastjson.JSONObject;import com.bsj.studentcard.upms.pc.biz.controller.home.MyWebSocket;import com.bsj.studentcard.upms.pc.biz.def.UserInfoWebSocket;import com.bsj.studentcard.upms.pc.biz.def.WsMsgDataVO;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.scheduling.annotation.Scheduled;import org.ponent;import org.springframework.util.MimeTypeUtils;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Set;import java.util.function.Function;import java.util.stream.Collectors;/*** 发送消息配置** @author 夕四* @date -05-14 16:29**/@Slf4j@Component@RequiredArgsConstructorpublic class MqSenderService {private final MySource source;/*** 需要MQ推送至websocket的topic** @param msg*/@StreamListener(MySource.ALARM_SOS)public void sosAlarm(Message<String> message) {String result = message.getPayload();if (StrUtil.isEmpty(result)) {log.warn("检测到报警消息为空");return;}List<AlarmLogDTO> oCardCallBackVOS = JSONObject.parseArray(result, AlarmLogDTO.class);if (CollUtil.isEmpty(oCardCallBackVOS)) {log.warn("检测到SOS报警消息为空");return;}log.info("监听到sos报警输出为:{}", result);//接下来的就是业务处理数据List<Long> cardIds = oCardCallBackVOS.stream().map(AlarmLogDTO::getCardId).distinct().collect(Collectors.toList());List<UserDataVO> userDataVOS = CommonBaseCacheForest.listTopUserList(cardIds);HashMap<Long, UserDataVO> commonMap = userDataVOS.stream().collect(Collectors.toMap(UserDataVO::getCardId,Function.identity(), (key1, key2) -> key2, HashMap::new));for (AlarmLogDTO callBack : oCardCallBackVOS) {Long cardId = callBack.getCardId();//根据产生的数据查找对应推送到所属的用户UserDataVO userDataVO = commonMap.get(cardId);List<Long> userIds = userDataVO.getUserId();if (CollUtil.isEmpty(userIds)) {log.info("该数据不属于任何用户,cardId:{}", cardId);continue;}Set<String> sidSet = UserInfoWebSocket.getSessionIdSetByUserIdList(userIds);if (CollUtil.isEmpty(sidSet)) {log.info("当前用户没一个登录,不发送");continue;}//组装推送消息的消息体OAlarmLogPageVO oAlarmLogPageVO = formatData(callBack, commonDTO);WsMsgDataVO<OAlarmLogPageVO> msg = new WsMsgDataVO<OAlarmLogPageVO>(oAlarmLogPageVO, AlarmTypeEnum.SOS_ALARM.name());sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));}}/*** 每30秒对缓存起来的用户保持心跳连接,防止掉线**/@Scheduled(cron = "*/30 * * * * ?")public void keepHeartbeat() {Set<String> sidSet = UserInfoWebSocket.getSessionIdSet();WsMsgDataVO<String> msg = new WsMsgDataVO<String>("心跳连接", "heartBeat");sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。