whycxzp
2021-04-19 1d487bd959c73fdb9a5b4c895173ece167ed5b02
src/main/java/com/whyc/video/WsHandler.java
@@ -22,15 +22,13 @@
    /**
     * 存放所有在线的客户端
     */
    private volatile static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>();
    public static Map<String, WebSocketSession> clients = new ConcurrentHashMap<>();
    private WebSocketSession session;
    public Process process;
    private String ip;
    private static Map<String,Process> processMap = new ConcurrentHashMap<>();
    public static Map<String,Process> processMap = new ConcurrentHashMap<>();
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
@@ -39,13 +37,11 @@
        //获取ws请求的参数
        String query = session.getUri().getQuery();
        ip = query;
        String ip = query;
        String ipSession = query+","+session.getId();
        clients.put(ipSession, session);
        //TODO 判断是否存在有未有session连接的procss ffmpeg进程,有则关闭;后续优化
        pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ipSession="+ipSession);
        //pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive");
        pushVideoAsRTSP(query,"http://127.0.0.1:8090/rtsp/receive?ip="+ip);
    }
    @Override
@@ -53,6 +49,7 @@
        System.out.println(">>>>>>>>>>终止了连接<<<<<<<<<<");
        Optional<String> first = clients.keySet().stream().filter(key -> key.contains(session.getId())).findFirst();
        clients.remove(first.get());
    }
    @Override
@@ -106,7 +103,7 @@
    public void sendVideo(String ipSession,byte[] data) {
        BinaryMessage binaryMessage = new BinaryMessage(data);
        String ip = ipSession.split(",")[0];
        String sessionId = ipSession.split(",")[1];
        //String sessionId = ipSession.split(",")[1];
        for (Map.Entry<String, WebSocketSession> sessionEntry : clients.entrySet()) {
            try {
@@ -114,7 +111,9 @@
                //if (session.getId().equals(sessionId)&&session.isOpen()) {
                //if (ip.equals(ip)&&session.getId().equals(sessionId)&&session.isOpen()) {
                if (sessionEntry.getKey().contains(ip)&&session.isOpen()) {
                    session.sendMessage(binaryMessage);
                    synchronized (session) {
                        session.sendMessage(binaryMessage);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();