This commit is contained in:
Bryan
2025-11-12 18:25:52 +08:00
parent 2abdacc8f6
commit 3b3646583f
12 changed files with 405 additions and 112 deletions

View File

@@ -87,6 +87,14 @@
<artifactId>json</artifactId>
<version>20231013</version>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-system</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

View File

@@ -0,0 +1,28 @@
package com.ruoyi.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessagePublisher {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 向指定频道发布消息
* @param channel 频道名称
* @param message 消息内容JSON字符串
*/
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
/**
* 作用:发布消息到 Redis 频道:将 message 发送到指定的 channel。
* 触发订阅者处理:所有通过 RedisMessageListenerContainer 订阅了该频道的客户端(如其他微服务、后台任务等)会立即收到消息,并调用对应的消息处理器(如 onMessage 方法)。
*
* channel目标频道名称如 "game_room_channel")。
* message要发送的消息可以是任意对象Spring 会自动序列化为字节数组或 JSON
*/
}

View File

@@ -0,0 +1,56 @@
package com.ruoyi.redis;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import com.ruoyi.websocket.server.GameRoomWebSocketHandler;
import java.io.IOException;
import java.util.Map;
@Component
public class RedisMessageSubscriber {
@Autowired
private ObjectMapper objectMapper;//是jackson里面的方法主要处理序列化和反序列化
/**
* 处理从Redis订阅到的消息
*/
public void onMessage(String redisMessage, String channel) {
try {
// 解析消息其中应包含房间ID和要发送的实际消息体
Map<String, Object> messageMap = objectMapper.readValue(redisMessage, Map.class);
String targetRoomId = (String) messageMap.get("roomId");
Map<String, Object> actualMessageToSend = (Map<String, Object>) messageMap.get("message");
// 调用本地广播方法,只发送给本实例内指定房间的用户
broadcastToLocalRoom(targetRoomId, actualMessageToSend);
} catch (Exception e) {
System.err.println("处理Redis消息失败: " + e.getMessage());
}
}
/**
* 仅向本地会话即当前JVM内的WebSocketSession广播消息
*/
private void broadcastToLocalRoom(String roomId, Map<String, Object> message) throws Exception {
// 遍历原Handler中的sessions变量需稍作调整使其可被访问
for (Map.Entry<String, WebSocketSession> entry : GameRoomWebSocketHandler.getSessions().entrySet()) {
String sessionKey = entry.getKey();
if (sessionKey.startsWith(roomId + "_")) { // 判断是否属于目标房间
WebSocketSession session = entry.getValue();
if (session != null && session.isOpen()) {
try {
String jsonMessage = objectMapper.writeValueAsString(message);
session.sendMessage(new TextMessage(jsonMessage));
} catch (IOException e) {
System.err.println("向本地会话发送消息失败: " + e.getMessage());
}
}
}
}
}
}

View File

@@ -0,0 +1,62 @@
package com.ruoyi.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisPubSubConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);// 设置 Redis 连接
// 订阅游戏房间频道
Topic gameRoomTopic = new ChannelTopic("game_room_channel"); // 订阅的频道
// 指定消息处理器即redisMessageSubscriber的onMessage方法
container.addMessageListener(new MessageListenerAdapter(redisMessageSubscriber, "onMessage"), gameRoomTopic);// addMessageListener绑定监听器和频道
/**
* 整工作流程
* 启动应用Spring 初始化 RedisMessageListenerContainer并建立与 Redis 的连接。
* 订阅频道:容器开始监听 game_room_channel 频道。
* 发布消息:其他服务或代码通过 Redis 的 PUBLISH 命令向 game_room_channel 发送消息,例如:
* bash
* PUBLISH game_room_channel "Hello, Game Room!"
* 消息处理:
* Redis 将消息推送给所有订阅者。
* RedisMessageListenerContainer 接收到消息后,调用 RedisMessageSubscriber.onMessage() 方法。
* 消息内容作为参数传递给 onMessage 方法,由开发者自定义处理逻辑(如更新游戏状态、广播通知等)。
*/
/**
* MessageListenerAdapter
* 作用:将 Redis 消息转发到指定的处理器方法(这里是 redisMessageSubscriber.onMessage
* 关键点:
* 第二个参数 "onMessage" 指定处理器方法名(需在 RedisMessageSubscriber 中实现)。
* 默认情况下,消息会以 byte[] 形式传递给处理器Spring 会自动反序列化。
*/
/***
* 发布消息的步骤
* 选择消息中间件:根据需求(如吞吐量、持久性、延迟)选择合适的工具。
* 创建主题Topic或队列Queue
* 主题Topic一对多广播所有订阅者都能收到消息
* 队列Queue点对点消息被一个订阅者消费
* 发布消息:
* 指定主题/队列名称。
* 发送消息内容通常是JSON、Protobuf等格式
*/
return container;
}
}

View File

@@ -1,19 +1,32 @@
package com.ruoyi.websocket.server;
import com.ruoyi.system.domain.ScoreRoomDetail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
public class GameRoomWebSocketHandler extends TextWebSocketHandler {
@Autowired
private StringRedisTemplate redisTemplate;
// 存储所有连接的会话
private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
public static Map<String, WebSocketSession> getSessions() {
return Collections.unmodifiableMap(sessions); // 返回不可修改的视图,保证线程安全
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 1. 从路径参数中获取 roomId 和 userId
@@ -40,9 +53,12 @@ public class GameRoomWebSocketHandler extends TextWebSocketHandler {
welcomeMsg.put("message", "欢迎加入房间: " + roomId);
welcomeMsg.put("roomId", roomId);
welcomeMsg.put("userId", userId);
welcomeMsg.put("timestamp", System.currentTimeMillis());
sendJsonMessage(session, welcomeMsg);
broadcastJsonToRoom(roomId,welcomeMsg);
}
@Override
@@ -64,22 +80,42 @@ public class GameRoomWebSocketHandler extends TextWebSocketHandler {
try {
// 解析客户端发送的 JSON 消息
Map<String, Object> clientMessage = objectMapper.readValue(payload, Map.class);
String msgType = (String) clientMessage.getOrDefault("type", "chat");
String msgType = (String) clientMessage.getOrDefault("type", "system");
System.out.println("消息类型" + msgType);
if(msgType.equals("scoreUpdate")){
int Score = (int) clientMessage.get("Score");
int giveUserScore = (int) clientMessage.get("giveUserScore") - Score;
int givedUserScore = (int) clientMessage.get("givedUserScore") + Score;
Map<String, Object> scoreUpdateMsg = new HashMap<>();
scoreUpdateMsg.put("type", msgType);
scoreUpdateMsg.put("giveUserId",clientMessage.get("giveUserId"));
scoreUpdateMsg.put("giveUserScore",giveUserScore);
scoreUpdateMsg.put("givedUserId",clientMessage.get("givedUserId"));
scoreUpdateMsg.put("givedUserScore",givedUserScore);
broadcastJsonToRoom(roomId,scoreUpdateMsg);
}else if(msgType.equals("scoreUpdates")){
Map<String, Object> broadcastMsg = new HashMap<>();
broadcastMsg.put("type", msgType);
broadcastMsg.put("userId", userId);
broadcastMsg.put("roomId", roomId);
broadcastMsg.put("content", clientMessage.get("content"));
broadcastMsg.put("timestamp", System.currentTimeMillis());
}
broadcastJsonToRoom(roomId, broadcastMsg);
else if(msgType.equals("userJoined")){
Map<String, Object> broadcastMsg = new HashMap<>();
broadcastMsg.put("type", msgType);
broadcastMsg.put("userId", userId);
broadcastMsg.put("roomId", roomId);
broadcastMsg.put("nickName", clientMessage.get("nickName"));
broadcastMsg.put("avatars", clientMessage.get("avatars"));
broadcastMsg.put("timestamp", System.currentTimeMillis());
broadcastJsonToRoom(roomId, broadcastMsg);
}
} catch (Exception e) {
System.out.println("解析客户端消息失败,按文本处理: " + e.getMessage());
Map<String, Object> broadcastMsg = new HashMap<>();
broadcastMsg.put("type", "chat");
broadcastMsg.put("type", "error");
broadcastMsg.put("userId", userId);
broadcastMsg.put("roomId", roomId);
broadcastMsg.put("content", payload);
@@ -151,12 +187,14 @@ public class GameRoomWebSocketHandler extends TextWebSocketHandler {
// 辅助方法:广播 JSON 消息到指定房间
private void broadcastJsonToRoom(String roomId, Map<String, Object> message) throws Exception {
String jsonMessage = objectMapper.writeValueAsString(message);
for (Map.Entry<String, WebSocketSession> entry : sessions.entrySet()) {
if (entry.getKey().startsWith(roomId + "_")) {
WebSocketSession session = entry.getValue();
if (session.isOpen()) {
session.sendMessage(new TextMessage(jsonMessage));
System.out.println(message);
sendJsonMessage(session, message);
System.out.println(111);
}
}
}