package com.whyc.video; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.BinaryWebSocketHandler; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; @Component public class WsHandler extends BinaryWebSocketHandler { /** * 存放所有在线的客户端 */ public static Map clients = new ConcurrentHashMap<>(); private WebSocketSession session; public Process process; public static Map processMap = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("新的加入"); this.session = session; //获取ws请求的参数 String query = session.getUri().getQuery(); String ip = query; String ipSession = query+","+session.getId(); clients.put(ipSession, session); pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<"); Optional first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst(); clients.remove(first.get()); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.out.println(">>>>>>>>>>发生错误,终止了连接<<<<<<<<<<"); Optional first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst(); clients.remove(first.get()); } public Integer pushVideoAsRTSP(String ip, String fileName){ int flag = -1; // ffmpeg位置,最好写在配置文件中 String ffmpegPath = "F:\\download\\ffmpeg-2021-04-11-git-309e3cc15c-essentials_build\\bin\\ffmpeg.exe "; //String ffmpegPath = ""; try { // 视频切换时,先销毁进程,全局变量Process process,方便进程销毁重启,即切换推流视频 /*if(process != null){ process.destroy(); System.out.println(">>>>>>>>>>推流视频切换<<<<<<<<<<"); }*/ System.out.println(">>>>>>>>>>推流视频新增<<<<<<<<<<"); String id = "rtsp://admin:a123456789@"+ip+":554/Streaming/Channels/101"; // cmd命令拼接,注意命令中存在空格 String command = ffmpegPath; // ffmpeg位置 command += " "; // ffmpeg开头,-re代表按照帧率发送,在推流时必须有 command += " -i \"" + id + "\""; // 指定要推送的视频 command += " -q 0 -f mpegts -codec:v mpeg1video -s 800x600 " + fileName; // 指定推送服务器,-f:指定格式 System.out.println("ffmpeg推流命令:" + command); //如果不存在进程,则建立ip对应的视频流进程 if(!processMap.containsKey(ip)){ // 运行cmd命令,获取其进程 process = Runtime.getRuntime().exec(command); //将进程存入到map processMap.put(ip,process); // 输出ffmpeg推流日志 BufferedReader br= new BufferedReader(new InputStreamReader(process.getErrorStream())); String line = ""; while ((line = br.readLine()) != null) { //System.out.println("视频推流信息[" + line + "]"); } flag = process.waitFor(); } }catch (Exception e){ e.printStackTrace(); } return flag; } public void sendVideo(String ipSession,byte[] data) { BinaryMessage binaryMessage = new BinaryMessage(data); String ip = ipSession.split(",")[0]; //String sessionId = ipSession.split(",")[1]; for (Map.Entry sessionEntry : clients.entrySet()) { try { WebSocketSession session = sessionEntry.getValue(); //if (session.getId().equals(sessionId)&&session.isOpen()) { //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) { if (sessionEntry.getKey().contains(ip)&&session.isOpen()) { synchronized (session) { session.sendMessage(binaryMessage); } } } catch (Exception e) { e.printStackTrace(); } } } }