whycxzp
2021-03-25 435353ecd73fff1a8903cdcbb981731d45f11cb3
引入WebSocket,更新WebSocket示例接口
3个文件已添加
2个文件已修改
209 ■■■■■ 已修改文件
pom.xml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/config/WebSocketConfig.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/service/DeviceManageService.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/ws/DeviceManageWebSocket.java 155 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/ws/WebSocketEncoder.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml
@@ -127,6 +127,11 @@
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>
    <build>
src/main/java/com/whyc/config/WebSocketConfig.java
New file
@@ -0,0 +1,15 @@
package com.whyc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
src/main/java/com/whyc/service/DeviceManageService.java
@@ -11,9 +11,8 @@
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
@Service("deviceManageService")
public class DeviceManageService {
    @Resource
src/main/java/com/whyc/ws/DeviceManageWebSocket.java
New file
@@ -0,0 +1,155 @@
package com.whyc.ws;
import com.whyc.dto.Response;
import com.whyc.service.DeviceManageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * webSocket实现与客户端消息交互
 *
 * 页面端,三个电源数据的实时监控,可以在打开第一个电源实时页面的时候,触发ws连接开启,点中某个电源时,页面ws发送message:传入参数:实时类型和电源id.
 * 一个WsSession一次开启ws,ws连接其实与http极为相似
 */
@ServerEndpoint(value = "/websocket/deviceManage/{userId}",encoders = WebSocketEncoder.class)
@Component
public class DeviceManageWebSocket {
    private static DeviceManageService service;
    @Autowired
    public void setService(DeviceManageService service) {
        DeviceManageWebSocket.service = service;
    }
    private Integer userId;
    //concurrent包的线程安全,用来存放每个客户端对应的MyWebSocket对象。
    //若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
//    private static Map<String, WebSocket> webSockets = new ConcurrentHashMap<>();
    private static CopyOnWriteArraySet<DeviceManageWebSocket> webSockets = new CopyOnWriteArraySet<>();
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    /**
     * 连接建立成功调用的方法
     *
     * @param session    可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Integer userId) {
        System.out.println("websocket1...");
        this.session = session;
        this.userId = userId;
        webSockets.add(this);
    }
    /**
     * 收到客户端消息后调用的方法
     * 连续监控到3次消息,会启动3个线程,这样导致 ws到页面 是3个响应数据;所以需要关闭ws后再打开ws 或者...
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message,Session session) {
        try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /*try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }*/
        /*System.out.println(session.getId());
        final String powerDeviceId = message;
        final Session sessionThread = session;
        Thread thread = new Thread("Thread_PowerACDCData"){
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(4000);
                        //获取powerACData记录
                        ServiceModel model =service.get(powerDeviceId);
                        if(sessionThread.isOpen()) {
                            sessionThread.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();*/
        /*//群发消息
        for(PowerACDCDataWebSocket item: webSockets){
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
                continue;
            }
        }*/
    }
    /**
     * 该方法没有用注解,是根据自己需要添加的方法。在自己的业务中调用,发送消息给前端。
     */
    public void sendMessage(String pageInf) throws IOException{
        int pageNum= Integer.parseInt(pageInf.split("-")[0]);
        int pageSize= Integer.parseInt(pageInf.split("-")[1]);
        Thread thread = new Thread("Thread_DeviceManage"){
            @Override
            public void run() {
                while (true) {
                    try {
                        //获取powerACData记录
                        Response model =service.getAll(pageNum,pageSize);
                        if(session.isOpen()) {
                            session.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                        sleep(4000);
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        System.out.println("closing");
        webSockets.remove(this);
    }
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        error.printStackTrace();
    }
}
src/main/java/com/whyc/ws/WebSocketEncoder.java
New file
@@ -0,0 +1,31 @@
package com.whyc.ws;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.whyc.dto.Response;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
/**
 * 编译器
 */
public class WebSocketEncoder implements Encoder.Text<Response> {
    @Override
    public String encode(Response o) throws EncodeException {
        return new GsonBuilder().create().toJson(o);
    }
    @Override
    public void init(EndpointConfig endpointConfig) {
    }
    @Override
    public void destroy() {
    }
}