whycxzp
2021-04-19 1d487bd959c73fdb9a5b4c895173ece167ed5b02
更新 视频接口
1个文件已删除
3个文件已修改
2个文件已添加
105 ■■■■■ 已修改文件
src/main/java/com/whyc/config/ScheduledConfig.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/video/CustomProcess.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/video/RtspController.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/video/VideoProcessTimer.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/video/WebsocketConfiguration.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/video/WsHandler.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/whyc/config/ScheduledConfig.java
New file
@@ -0,0 +1,18 @@
package com.whyc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class ScheduledConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(10);
        scheduling.initialize();
        return scheduling;
    }
}
src/main/java/com/whyc/video/CustomProcess.java
File was deleted
src/main/java/com/whyc/video/RtspController.java
@@ -22,7 +22,8 @@
    @RequestMapping("/receive")
    @ResponseBody
    public String receive(HttpServletRequest request, Object response) {
        String ipSession = request.getParameter("ipSession");
        //String ipSession = request.getParameter("ipSession");
        String ip = request.getParameter("ip");
        System.out.println("method:" + request.getMethod());
        try {
            ServletInputStream inputStream = request.getInputStream();
@@ -36,7 +37,7 @@
//                System.out.println("");
//                System.out.println(len);
//                System.out.println("--------------------------------------------------------");
                wsHandler.sendVideo(ipSession,data);
                wsHandler.sendVideo(ip,data);
            }
        } catch (Exception e) {
            e.printStackTrace();
src/main/java/com/whyc/video/VideoProcessTimer.java
New file
@@ -0,0 +1,39 @@
package com.whyc.video;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.socket.WebSocketSession;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@EnableScheduling
@Configuration
public class VideoProcessTimer {
    /**检查是否有不需要的ffmpeg进程,有则关掉,节约资源,10秒钟检查一次*/
    @Scheduled(cron = "0/10 * * * * *")
    public void checkProcess(){
        LocalDateTime now = LocalDateTime.now();
        System.out.println("当前时间:"+now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        Map<String, WebSocketSession> clients = WsHandler.clients;
        Map<String, Process> processMap = WsHandler.processMap;
        //processMap中的ip,如果clients中不存在,则关闭这个process
        Set<String> clientIpSessions = clients.keySet();
        Set<String> clientIps = clientIpSessions.stream().map(ipSession -> ipSession.split(",")[0]).collect(Collectors.toSet());
        processMap.keySet().stream().forEach(ip->{
            if(!clientIps.contains(ip)){
                Process process = processMap.get(ip);
                process.destroy();
                processMap.remove(ip);
            }
        });
    }
}
src/main/java/com/whyc/video/WebsocketConfiguration.java
@@ -28,7 +28,7 @@
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry.addHandler(wsHandler,"/video").addInterceptors(wsIntercept).setAllowedOrigins("*");
        webSocketHandlerRegistry.addHandler(wsHandler,"/video1").addInterceptors(wsIntercept).setAllowedOrigins("*");
    }
    @Bean
src/main/java/com/whyc/video/WsHandler.java
@@ -22,15 +22,13 @@
    /**
     * 存放所有在线的客户端
     */
    private volatile static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>();
    public static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>();
    private WebSocketSession session;
    public Process process;
    private String ip;
    private static Map<String,Process> processMap = new ConcurrentHashMap<>();
    public static Map<String,Process> processMap = new ConcurrentHashMap<>();
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
@@ -39,13 +37,11 @@
        //获取ws请求的参数
        String query = session.getUri().getQuery();
        ip = query;
        String ip = query;
        String ipSession = query+","+session.getId();
        clients.put(ipSession, session);
        //TODO 判断是否存在有未有session连接的procss ffmpeg进程,有则关闭;后续优化
        pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ipSession="+ipSession);
        //pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive");
        pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip);
    }
    @Override
@@ -53,6 +49,7 @@
        System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<");
        Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst();
        clients.remove(first.get());
    }
    @Override
@@ -106,7 +103,7 @@
    public void sendVideo(String ipSession,byte[] data) {
        BinaryMessage binaryMessage = new BinaryMessage(data);
        String ip = ipSession.split(",")[0];
        String sessionId = ipSession.split(",")[1];
        //String sessionId = ipSession.split(",")[1];
        for (Map.Entry<String, WebSocketSession> sessionEntry : clients.entrySet()) {
            try {
@@ -114,8 +111,10 @@
                //if (session.getId().equals(sessionId)&&session.isOpen()) {
                //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) {
                if (sessionEntry.getKey().contains(ip)&&session.isOpen()) {
                    synchronized (session) {
                    session.sendMessage(binaryMessage);
                }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }