whyczh
2021-04-07 42536733ddfce247d5671d321ea1ec1c8b37f36a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package com.whyc.ws;
 
import com.whyc.dto.Response;
import com.whyc.service.*;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * webSocket实现与客户端消息交互
 *
 * 页面端,三个电源数据的实时监控,可以在打开第一个电源实时页面的时候,触发ws连接开启,点中某个电源时,页面ws发送message:传入参数:实时类型和电源id.
 * 一个WsSession一次开启ws,ws连接其实与http极为相似
 */
@ServerEndpoint(value = "/websocket",encoders = WebSocketEncoder.class)
@Component
public class DeviceWebSocket {
 
    private static DynamicLoadStateService dynamicLoadStateService;
    private static MotorStateService motorStateService;
    private static ElectricStateService electricStateService;
    private static Electric2MWStateService electric2MWStateService;
 
    @Autowired
    public void setDynamicLoadStateService(DynamicLoadStateService dynamicLoadStateService) {
        DeviceWebSocket.dynamicLoadStateService = dynamicLoadStateService;
    }
 
    @Autowired
    public void setMotorStateService(MotorStateService motorStateService) {
        DeviceWebSocket.motorStateService = motorStateService;
    }
 
    @Autowired
    public void setElectricStateService(ElectricStateService electricStateService) {
        DeviceWebSocket.electricStateService = electricStateService;
    }
 
    @Autowired
    public void setElectric2MWStateService(Electric2MWStateService electric2MWStateService) {
        DeviceWebSocket.electric2MWStateService = electric2MWStateService;
    }
 
    private Integer userId;
 
    //concurrent包的线程安全,用来存放每个客户端对应的MyWebSocket对象。
    //若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
//    private static Map<String, WebSocket> webSockets = new ConcurrentHashMap<>();
    private static CopyOnWriteArraySet<DeviceWebSocket> webSockets = new CopyOnWriteArraySet<>();
 
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
 
    /**
     * 连接建立成功调用的方法
     *
     * @param session    可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("websocket1...");
        this.session = session;
        webSockets.add(this);
    }
 
    /**
     * 收到客户端消息后调用的方法
     * 连续监控到3次消息,会启动3个线程,这样导致 ws到页面 是3个响应数据;所以需要关闭ws后再打开ws 或者...
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message,Session session) {
        try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /*try {
            this.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }*/
        /*System.out.println(session.getId());
 
        final String powerDeviceId = message;
        final Session sessionThread = session;
        Thread thread = new Thread("Thread_PowerACDCData"){
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(4000);
                        //获取powerACData记录
                        ServiceModel model =service.get(powerDeviceId);
                        if(sessionThread.isOpen()) {
                            sessionThread.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();*/
        /*//群发消息
        for(PowerACDCDataWebSocket item: webSockets){
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
                continue;
            }
        }*/
    }
 
    /**
     * 该方法没有用注解,是根据自己需要添加的方法。在自己的业务中调用,发送消息给前端。
     */
    public void sendMessage(String deviceId) throws IOException{
        Thread thread = new Thread("Thread_DeviceManage"){
            @Override
            public void run() {
                while (true) {
                    try {
                        Response model = null;
                        //获取记录
                        if(deviceId.startsWith("1")) {
                            model = dynamicLoadStateService.get(Integer.parseInt(deviceId));
                        }
                        else if(deviceId.startsWith("2")) {
                            model = motorStateService.getById(Integer.parseInt(deviceId));
                        }
                        else if(deviceId.startsWith("3")) {
                            model = electricStateService.get(Integer.parseInt(deviceId));
                        }
                        else if(deviceId.startsWith("4")) {
                            model = electric2MWStateService.get(Integer.parseInt(deviceId));
                        }
                        if(session.isOpen()) {
                            session.getBasicRemote().sendObject(model);
                        }else{
                            break;
                        }
                        sleep(4000);
                    } catch (IOException | InterruptedException | EncodeException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
    }
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        System.out.println("closing");
        webSockets.remove(this);
    }
 
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        error.printStackTrace();
    }
 
}