src/main/java/com/whyc/config/ScheduledConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/whyc/video/CustomProcess.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/whyc/video/RtspController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/whyc/video/VideoProcessTimer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/whyc/video/WebsocketConfiguration.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/whyc/video/WsHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/whyc/config/ScheduledConfig.java
New file @@ -0,0 +1,18 @@ package com.whyc.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class ScheduledConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler(); scheduling.setPoolSize(10); scheduling.initialize(); return scheduling; } } src/main/java/com/whyc/video/CustomProcess.java
File was deleted src/main/java/com/whyc/video/RtspController.java
@@ -22,7 +22,8 @@ @RequestMapping("/receive") @ResponseBody public String receive(HttpServletRequest request, Object response) { String ipSession = request.getParameter("ipSession"); //String ipSession = request.getParameter("ipSession"); String ip = request.getParameter("ip"); System.out.println("method:" + request.getMethod()); try { ServletInputStream inputStream = request.getInputStream(); @@ -36,7 +37,7 @@ // System.out.println(""); // System.out.println(len); // System.out.println("--------------------------------------------------------"); wsHandler.sendVideo(ipSession,data); wsHandler.sendVideo(ip,data); } } catch (Exception e) { e.printStackTrace(); src/main/java/com/whyc/video/VideoProcessTimer.java
New file @@ -0,0 +1,39 @@ package com.whyc.video; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.socket.WebSocketSession; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @EnableScheduling @Configuration public class VideoProcessTimer { /**检查是否有不需要的ffmpeg进程,有则关掉,节约资源,10秒钟检查一次*/ @Scheduled(cron = "0/10 * * * * *") public void checkProcess(){ LocalDateTime now = LocalDateTime.now(); System.out.println("当前时间:"+now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); Map<String, WebSocketSession> clients = WsHandler.clients; Map<String, Process> processMap = WsHandler.processMap; //processMap中的ip,如果clients中不存在,则关闭这个process Set<String> clientIpSessions = clients.keySet(); Set<String> clientIps = clientIpSessions.stream().map(ipSession -> ipSession.split(",")[0]).collect(Collectors.toSet()); processMap.keySet().stream().forEach(ip->{ if(!clientIps.contains(ip)){ Process process = processMap.get(ip); process.destroy(); processMap.remove(ip); } }); } } src/main/java/com/whyc/video/WebsocketConfiguration.java
@@ -28,7 +28,7 @@ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) { webSocketHandlerRegistry.addHandler(wsHandler,"/video").addInterceptors(wsIntercept).setAllowedOrigins("*"); webSocketHandlerRegistry.addHandler(wsHandler,"/video1").addInterceptors(wsIntercept).setAllowedOrigins("*"); } @Bean src/main/java/com/whyc/video/WsHandler.java
@@ -22,15 +22,13 @@ /** * 存放所有在线的客户端 */ private volatile static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>(); public static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>(); private WebSocketSession session; public Process process; private String ip; private static Map<String,Process> processMap = new ConcurrentHashMap<>(); public static Map<String,Process> processMap = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { @@ -39,13 +37,11 @@ //获取ws请求的参数 String query = session.getUri().getQuery(); ip = query; String ip = query; String ipSession = query+","+session.getId(); clients.put(ipSession, session); //TODO 判断是否存在有未有session连接的procss ffmpeg进程,有则关闭;后续优化 pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ipSession="+ipSession); //pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive"); pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip); } @Override @@ -53,6 +49,7 @@ System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<"); Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst(); clients.remove(first.get()); } @Override @@ -106,7 +103,7 @@ public void sendVideo(String ipSession,byte[] data) { BinaryMessage binaryMessage = new BinaryMessage(data); String ip = ipSession.split(",")[0]; String sessionId = ipSession.split(",")[1]; //String sessionId = ipSession.split(",")[1]; for (Map.Entry<String, WebSocketSession> sessionEntry : clients.entrySet()) { try { @@ -114,8 +111,10 @@ //if (session.getId().equals(sessionId)&&session.isOpen()) { //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) { if (sessionEntry.getKey().contains(ip)&&session.isOpen()) { synchronized (session) { session.sendMessage(binaryMessage); } } } catch (Exception e) { e.printStackTrace(); }