| | |
| | | /** |
| | | * 存放所有在线的客户端 |
| | | */ |
| | | private volatile static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>(); |
| | | public static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>(); |
| | | |
| | | private WebSocketSession session; |
| | | |
| | | public Process process; |
| | | |
| | | private String ip; |
| | | |
| | | private static Map<String,Process> processMap = new ConcurrentHashMap<>(); |
| | | public static Map<String,Process> processMap = new ConcurrentHashMap<>(); |
| | | |
| | | @Override |
| | | public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
| | |
| | | |
| | | //获取ws请求的参数 |
| | | String query = session.getUri().getQuery(); |
| | | ip = query; |
| | | String ip = query; |
| | | String ipSession = query+","+session.getId(); |
| | | clients.put(ipSession, session); |
| | | |
| | | //TODO 判断是否存在有未有session连接的procss ffmpeg进程,有则关闭;后续优化 |
| | | pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ipSession="+ipSession); |
| | | //pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive"); |
| | | pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip); |
| | | } |
| | | |
| | | @Override |
| | |
| | | System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<"); |
| | | Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst(); |
| | | clients.remove(first.get()); |
| | | |
| | | } |
| | | |
| | | @Override |
| | |
| | | public void sendVideo(String ipSession,byte[] data) { |
| | | BinaryMessage binaryMessage = new BinaryMessage(data); |
| | | String ip = ipSession.split(",")[0]; |
| | | String sessionId = ipSession.split(",")[1]; |
| | | //String sessionId = ipSession.split(",")[1]; |
| | | |
| | | for (Map.Entry<String, WebSocketSession> sessionEntry : clients.entrySet()) { |
| | | try { |
| | |
| | | //if (session.getId().equals(sessionId)&&session.isOpen()) { |
| | | //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) { |
| | | if (sessionEntry.getKey().contains(ip)&&session.isOpen()) { |
| | | session.sendMessage(binaryMessage); |
| | | synchronized (session) { |
| | | session.sendMessage(binaryMessage); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |