Springboot中统一启动多个socketIO

xing-chen-d / 2024-10-09 / 原文

前言

这篇随笔属实没想到一个好名字,起因是在项目中遇到了一个springboot服务会发出多个socket服务的场景,而且我们使用的是socketIO服务,为了减少调试工作和重复的开发工作,让开发在项目中专注于业务编写,因此封装了一个在启动springboot服务时,自动创建socketIONamespace的逻辑

依赖

在使用此依赖时,我的项目版本为:Static Badge Static Badge
因为是要跟公司其他团队的架构保持一致,所以我们的socketIo的版本偏低,使用的是 Static Badge

<dependency>  
    <groupId>com.corundumstudio.socketio</groupId>  
    <artifactId>netty-socketio</artifactId>  
    <version>1.7.19</version>  
</dependency>  
<dependency>  
    <groupId>io.socket</groupId>  
    <artifactId>socket.io-client</artifactId>  
    <version>1.0.0</version>  
</dependency>

逻辑

首先需要添加一个socketIo的配置类

import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class SocketIoConfig {
  
    @Value("${socketio.port}")  
    private Integer port;  
  
    @Value("${socketio.bossCount}")  
    private int bossCount;  
  
    @Value("${socketio.workCount}")  
    private int workCount;  
  
    @Value("${socketio.allowCustomRequests}")  
    private boolean allowCustomRequests;  
  
    @Value("${socketio.upgradeTimeout}")  
    private int upgradeTimeout;  
  
    @Value("${socketio.pingTimeout}")  
    private int pingTimeout;  
  
    @Value("${socketio.pingInterval}")  
    private int pingInterval;  
  
    @Bean  
    public SocketIOServer socketIoServer() {  
        SocketConfig socketConfig = new SocketConfig();  
        socketConfig.setTcpNoDelay(true);  
        socketConfig.setSoLinger(0);
        // 因为使用了Spring的
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();  
        config.setSocketConfig(socketConfig);  
        // 设置授权监听器
        config.setAuthorizationListener(new AuthorizationListener() {
            @Override
            public boolean isAuthorized(HandshakeData data) {
                // 在这里添加你的授权逻辑
                // 例如,检查握手数据中的 token
                String token = data.getSingleUrlParam("token");
                return "valid_token".equals(token);
            }
        });
        config.setPort(port);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        return new SocketIOServer(config);
    }
}

这些类的配置,我是使用的 application.yml 进行管理,在 application.yml 中添加了相应的配置.
对于授权监听器这块的逻辑,我一直没有真正的使用起来。我对于token的处理是放在了建立socket链接后进行处理

需要添加一个策略接口 SocketIoStrategy和一个初始化加载的类 SocketInitHandle

public interface SocketIoStrategy {  
  
    /**  
     * 定义命名空间的Url  
     *     * @return 命名空间url  
     */
	String defineNamespaceUrl();  
  
    /**  
     * 链接后方法  
     *  
     * @param client socket客户端信息  
     */  
    void connected(SocketIOClient client);
  
    /**  
     * 自定义监听器  
     *  
     * @param socketIoNamespace socketIo命名空间  
     */  
    void customListener(SocketIONamespace socketIoNamespace);
  
    /**  
     * 链接后方法  
     *  
     * @param client socket客户端信息  
     */  
    void disconnect(SocketIOClient client);
  
}
import com.corundumstudio.socketio.SocketIONamespace;  
import com.corundumstudio.socketio.SocketIOServer;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.context.ApplicationContext;  
import org.springframework.context.ApplicationListener;  
import org.springframework.context.event.ContextRefreshedEvent;  
import org.springframework.stereotype.Service;  
  
import javax.annotation.PreDestroy;  
import javax.annotation.Resource;

@Slf4j
@Service  
public class SocketInitHandle implements ApplicationListener<ContextRefreshedEvent> {  
  
	@Resource
	private SocketIOServer socketIoServer;
  
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {  
        ApplicationContext applicationContext = event.getApplicationContext();  
        // 获取继承了ISocketIoService的bean
        applicationContext.getBeansOfType(SocketIoStrategy.class).forEach((beanName, socketService) -> {
            log.info("{} socket io namespace:{}", beanName, socketService.defineNamespaceUrl());
            String namespaceUrl = socketService.defineNamespaceUrl();
            SocketIONamespace socketIoNamespace = socketIoServer.addNamespace(namespaceUrl);
            socketIoNamespace.addConnectListener(socketService::connected);
            socketIoNamespace.addDisconnectListener(socketService::disconnect);
            socketService.customListener(socketIoNamespace);
        });
  
        // 启动服务
        socketIoServer.start();
    }
  
    /**  
     * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题  
     */  
    @PreDestroy  
    private void autoStop() {  
        stop();
    }
  
    public void stop() {
        if (socketIoServer != null) {
            socketIoServer.stop();
            socketIoServer = null;
        }
    }
}

这样,在项目启动时,会自动识别到实现了SocketIoStrategy这个接口的类,下面是一个这个接口实现的样例


@Slf4j
@Service
public class DemoSocketIoStrategyImpl implements SocketIoStrategy {  
  
    /**  
     * 存放已连接的客户端  
     */  
    private static final Map<String, SocketUtil.SocketClientInfo> CLIENT_MAP = new ConcurrentHashMap<>();  
  
    @Override  
    public String defineNamespaceUrl() {  
        return "/demo";  
    }  
  
    @Override  
    public void connected(SocketIOClient client) {  
        SocketUtil.SocketClientInfo clientInfo = SocketUtil.formationSocketInfoWithClient(client);  
        bizLog.info("************ 客户端:{} 已连接 ************", clientInfo.getClientId());  
        // 自定义事件`connected` -> 与客户端通信  (也可以使用内置事件,如:Socket.EVENT_CONNECT)  
        client.sendEvent(Socket.EVENT_CONNECT, "成功连接");  
        CLIENT_MAP.put(clientInfo.getClientId(), clientInfo);  
    }  
  
    @Override  
    public void customListener(SocketIONamespace socketIoNamespace) {  
        socketIoNamespace.addEventListener("PUSH_DATA_EVENT", String.class, (client, data, ackSender) -> {  
            // 客户端推送`client_info_event`事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型  
            String clientId = SocketUtil.getClientIdByClient(client);  
            bizLog.info("client{} push msg:{}", clientId, data);  
        });  
    }  
  
    @Override  
    public void disconnect(SocketIOClient client) {  
        String clientId = SocketUtil.getClientIdByClient(client);  
        bizLog.info("{} *********************** 客户端已断开连接", clientId);  
        if (clientId != null) {  
            CLIENT_MAP.remove(clientId);  
            client.disconnect();  
        }  
    }  
  
    public void pushBroadcastMessages(String eventType, String msgContent) {  
        CLIENT_MAP.forEach((clientId, clientInfo) -> {  
            bizLog.info("send fence msg to {}, content:{}", clientId, msgContent);  
            clientInfo.getClient().sendEvent(eventType, msgContent);  
        });  
    }  
}

这个类实现了socketIO策略,还提供了一个广播的实现方法,在服务中需要广播消息时,执行消息的类型和内容即可发送

前端逻辑

下面是一个基于react的使用逻辑


const mySocket = useRef<any>(null);

const startSocket = () => {
    if (!mySocket) {
        // socketIoUrl是对应后端服务的域名,/demo是对应链接的链接路径,用于区分一个服务中的多个socket逻辑
        mySocket = io(`${socketIoUrl}/demo`, {
            reconnectionDelayMax: 10000,
            query: {
                // 一些在链接的时候需要携带的参数
            },
        });
        
        mySocket.on('connect', (ev: any) => {
            console.log('socket 连接成功', ev);
            // 在链接成功后,发送一个emitEventType类型的事件消息
            mySocket.emit('emitEventType',"emit data")
        });
    
        // 此处监听后端服务的eventType类型的事件的数据
        mySocket.on('eventType', (data: any) => {
            // 接收到数据后的逻辑
        });
    }
};


useEffect(()=>{
    // 在加载的时候建立链接
    startSocket();
    return () => {
        //断开socket连接
        if (mySocket.current) {
            mySocket.current.closeSocket();
        }
    };
},[])