whyczh
2021-04-07 9f83bdaebfd57892bc48857c286780a01efeb581
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.whyc.ws;
 
import com.whyc.dto.Response;
import com.whyc.service.DeviceInfService;
import com.whyc.service.DeviceManageService;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * webSocket实现与客户端消息交互
 *
 * 页面端,三个电源数据的实时监控,可以在打开第一个电源实时页面的时候,触发ws连接开启,点中某个电源时,页面ws发送message:传入参数:实时类型和电源id.
 * 一个WsSession一次开启ws,ws连接其实与http极为相似
 */
@ServerEndpoint(value = "/websocket/deviceManage/{userId}",encoders = WebSocketEncoder.class)
@Component
@Api(tags = "设备实时状态")
public class DeviceInfWebSocket {
 
    private static DeviceInfService service;
 
    @Autowired
    public void setService(DeviceInfService service) {
        DeviceInfWebSocket.service = service;
    }
 
    private Integer userId;
 
    //concurrent包的线程安全,用来存放每个客户端对应的MyWebSocket对象。
    //若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
//    private static Map<String, WebSocket> webSockets = new ConcurrentHashMap<>();
    private static CopyOnWriteArraySet<DeviceInfWebSocket> webSockets = new CopyOnWriteArraySet<>();
 
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
 
    /**
     * 连接建立成功调用的方法
     *
     * @param session    可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Integer userId) {
        System.out.println("websocket1...");
        this.session = session;
        this.userId = userId;
        webSockets.add(this);
    }
 
    /**
     * 收到客户端消息后调用的方法
     * 连续监控到3次消息,会启动3个线程,这样导致 ws到页面 是3个响应数据;所以需要关闭ws后再打开ws 或者...
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message,Session session) {
        try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /*try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }*/
        /*System.out.println(session.getId());
 
        final String powerDeviceId = message;
        final Session sessionThread = session;
        Thread thread = new Thread("Thread_PowerACDCData"){
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(4000);
                        //获取powerACData记录
                        ServiceModel model =service.get(powerDeviceId);
                        if(sessionThread.isOpen()) {
                            sessionThread.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();*/
        /*//群发消息
        for(PowerACDCDataWebSocket item: webSockets){
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
                continue;
            }
        }*/
    }
 
    /**
     * 该方法没有用注解,是根据自己需要添加的方法。在自己的业务中调用,发送消息给前端。
     */
    public void sendMessage(String pageInf) throws IOException{
        Thread thread = new Thread("Thread_DeviceManage"){
            @Override
            public void run() {
                while (true) {
                    try {
                        //获取powerACData记录
                        Response model =service.getAll();
                        if(session.isOpen()) {
                            session.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                        sleep(4000);
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
    }
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        System.out.println("closing");
        webSockets.remove(this);
    }
 
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        error.printStackTrace();
    }
 
}