whycxzp
2021-04-19 3ce5138c7cb230c531c3e0ee43a6e9cb361719a1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.whyc.video;
 
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.BinaryWebSocketHandler;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
 
@Component
public class WsHandler extends BinaryWebSocketHandler {
 
    /**
     * 存放所有在线的客户端
     */
    public static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>();
 
    private WebSocketSession session;
 
    public Process process;
 
    public static Map<String,Process> processMap = new ConcurrentHashMap<>();
 
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        //System.out.println("新的加入");
        this.session = session;
 
        //获取ws请求的参数
        String query = session.getUri().getQuery();
        String ip = query;
        String ipSession = query+","+session.getId();
        clients.put(ipSession, session);
 
        pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip);
    }
 
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        //System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<");
        Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst();
        clients.remove(first.get());
 
    }
 
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        //System.out.println(">>>>>>>>>>发生错误,终止了连接<<<<<<<<<<");
        Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst();
        clients.remove(first.get());
    }
 
    public Integer pushVideoAsRTSP(String ip, String fileName){
        int flag = -1;
        // ffmpeg位置,最好写在配置文件中
        String ffmpegPath = "F:\\download\\ffmpeg-2021-04-11-git-309e3cc15c-essentials_build\\bin\\ffmpeg.exe ";
        //String ffmpegPath = "";
        try {
            //System.out.println(">>>>>>>>>>推流视频新增<<<<<<<<<<");
            String id = "rtsp://admin:a123456789@"+ip+":554/Streaming/Channels/101";
            // cmd命令拼接,注意命令中存在空格
            String command = ffmpegPath; // ffmpeg位置
            command += "  "; // ffmpeg开头,-re代表按照帧率发送,在推流时必须有
            command += " -i \"" + id + "\""; // 指定要推送的视频
            command += " -q 0 -f mpegts -codec:v mpeg1video -s 800x600 " + fileName; // 指定推送服务器,-f:指定格式
            //System.out.println("ffmpeg推流命令:" + command);
 
            //如果不存在进程,则建立ip对应的视频流进程
            if(!processMap.containsKey(ip)){
                // 运行cmd命令,获取其进程
                process = Runtime.getRuntime().exec(command);
                //将进程存入到map
                processMap.put(ip,process);
                // 输出ffmpeg推流日志
                BufferedReader br= new BufferedReader(new InputStreamReader(process.getErrorStream()));
                String line = "";
                while ((line = br.readLine()) != null) {
                    //System.out.println("视频推流信息[" + line + "]");
                }
                flag = process.waitFor();
            }
 
        }catch (Exception e){
            e.printStackTrace();
        }
        return flag;
    }
 
    public void sendVideo(String ipSession,byte[] data) {
        BinaryMessage binaryMessage = new BinaryMessage(data);
        String ip = ipSession.split(",")[0];
        //String sessionId = ipSession.split(",")[1];
 
        for (Map.Entry<String, WebSocketSession> sessionEntry : clients.entrySet()) {
            try {
                WebSocketSession session = sessionEntry.getValue();
                //if (session.getId().equals(sessionId)&&session.isOpen()) {
                //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) {
                if (sessionEntry.getKey().contains(ip)&&session.isOpen()) {
                    synchronized (session) {
                        session.sendMessage(binaryMessage);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
 
    }
}