package com.whyc.ws; import com.whyc.dto.Response; import com.whyc.pojo.MotorState; import com.whyc.pojo.TestPlan; import com.whyc.service.MotorStateService; import com.whyc.service.TestPlanService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint(value = "/websocket/{testPlanId}") @Component @Slf4j public class TestDataWebSocket { private static MotorStateService motorStateService; private static TestPlanService testPlanService; @Autowired public void setMotorStateService(MotorStateService motorStateService){ TestDataWebSocket.motorStateService = motorStateService; } @Autowired public void setTestPlanService(TestPlanService testPlanService){ TestDataWebSocket.testPlanService = testPlanService; } /** * 与某个客户端的连接对话,需要通过它来给客户端发送消息 */ private Session session; /** * 标识当前连接客户端的参数 试验计划id */ private Integer testPlanId; /** * 用于存所有的连接服务的客户端,这个对象存储是安全的 */ //private static ConcurrentHashMap webSocketSet = new ConcurrentHashMap<>(); private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); @OnOpen public void onOpen(Session session,@PathParam("testPlanId") Integer testPlanId){ System.out.println("试验计划实时数据websocket..."); this.session = session; this.testPlanId = testPlanId; webSockets.add(this); } @OnMessage public void onMessage(Integer message,Session session) { try { this.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } public void sendMessage(Integer testPlanId) throws IOException{ Thread thread = new Thread("TestDataWebSocket"){ @Override public void run() { while (true) { try { Response model = null; //获取试验计划数据 TestPlan testPlan = testPlanService.getOneById(testPlanId); //获取试验参试设备 String devices = testPlan.getDevices(); String[] devIds = devices.split(","); List list = new ArrayList<>(); for (String deviceId:devIds) { MotorState motorState = motorStateService.getByDeviceId(Integer.valueOf(deviceId)); list.add(motorState); } model.set(1,list); if(session.isOpen()) { session.getBasicRemote().sendObject(model); }else{ break; } sleep(4000); } catch (IOException | InterruptedException | EncodeException e) { e.printStackTrace(); } } } }; thread.start(); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { System.out.println("closing"); webSockets.remove(this); } /** * 发生错误时调用 * @param session * @param error */ @OnError public void onError(Session session, Throwable error){ System.out.println("发生错误"); error.printStackTrace(); } }