From c431dd2c105fb783dfc103d71476f5f54ec41199 Mon Sep 17 00:00:00 2001
From: whyclxw <810412026@qq.com>
Date: 星期二, 21 一月 2025 08:58:19 +0800
Subject: [PATCH] 线程管理推送

---
 src/main/java/com/whyc/webSocket/ProcessSurveySocket.java |   92 +++++++++++++++++++++++++++------------------
 1 files changed, 55 insertions(+), 37 deletions(-)

diff --git a/src/main/java/com/whyc/webSocket/ProcessSurveySocket.java b/src/main/java/com/whyc/webSocket/ProcessSurveySocket.java
index 576bb44..a28425b 100644
--- a/src/main/java/com/whyc/webSocket/ProcessSurveySocket.java
+++ b/src/main/java/com/whyc/webSocket/ProcessSurveySocket.java
@@ -1,74 +1,92 @@
 package com.whyc.webSocket;
 
-import com.whyc.config.WebSocketConfig;
-import com.whyc.dto.Response;
-import com.whyc.factory.ThreadPoolExecutorFactory;
-import com.whyc.pojo.db_user.UserInf;
-import com.whyc.service.*;
+import com.whyc.service.ProcessSurveyService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
 import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-
-
-/**
- * 绾跨▼绠$悊
- */
+import java.util.HashMap;
+import java.util.Map;
 @Component
-@ServerEndpoint(value = "/processSurvey",encoders = WebSocketEncoder.class,configurator = WebSocketConfig.class)
+@ServerEndpoint(value = "/processSurvey",encoders = WebSocketEncoder.class)
 public class ProcessSurveySocket {
+    private volatile Thread thread;
 
-    private Session session;
+    private static final int executeTime = 5000;
 
-    private Thread thread;
+    private volatile boolean runFlag=true;
+
+    private volatile Map<String,Thread> threadMap = new HashMap<>();
+
+    private volatile Map<Long,Boolean> threadFlagMap = new HashMap<>();
 
     private static ProcessSurveyService service;
 
+    private Session session;
+
     @Autowired
-    public void setProcessSurveySocket(ProcessSurveyService service) {
+    public void setService(ProcessSurveyService service){
         ProcessSurveySocket.service = service;
     }
 
-
     @OnOpen
-    public void onOpen(Session session) {
-        this.session = session;
-        Thread thread = new Thread("Thread_HomeSocket") {
-            @Override
-            public void run() {
-                try {
+    public void onOpen(Session session){
+        this.session=session;
+    }
 
-                    while (!currentThread().isInterrupted()) {
-                        while (!currentThread().isInterrupted()) {
-                            session.getBasicRemote().sendObject(service.getAll());
-                            sleep(5000);
+    @OnMessage
+    public synchronized void onMessage(Session session,String message){
+        thread = new Thread("Thread_processSurvey") {
+            public void run() {
+                while (runFlag && !isInterrupted()) {
+                    Thread thread = currentThread();
+                    threadFlagMap.put(thread.getId(), true);
+                    try {
+                        if (session.isOpen()) {
+                            //鎺ㄩ�佷俊鎭�
+                            synchronized (session) {
+                                session.getBasicRemote().sendObject(service.getAll());
+                            }
+                            threadFlagMap.put(thread.getId(),false);
                         }
+                        sleep(executeTime);
+                        //} catch (IOException | InterruptedException | EncodeException e) {
+                    } catch (Exception e) {
+                        interrupt();
                     }
-                } catch (Exception e) {
-                    this.interrupt();
                 }
             }
         };
         thread.start();
-        this.thread = thread;
+        threadFlagMap.put(thread.getId(),true);
+        //鍋滄鑰佺殑socket绾跨▼
+        Thread threadBefore = threadMap.get(session.getId());
+        if(threadBefore !=null && threadBefore.isAlive()){
+            while (threadFlagMap.get(threadBefore.getId())){
+            }
+            threadBefore.interrupt();
+        }
+        //灏嗙嚎绋嬪瓨鍌�,渚夸簬璋冪敤瀹氫綅
+        threadMap.put(session.getId(), this.thread);
     }
 
     @OnClose
-    public void onClose(CloseReason closeReason) throws IOException {
+    public void onClose(CloseReason closeReason){
         System.err.println("closeReason = " + closeReason);
-        if(session.isOpen()){
-            session.close();
+        runFlag = false;
+        if (thread != null && thread.isAlive()) {
+            thread.interrupt();
         }
+        threadMap.remove(session.getId());
     }
 
     @OnError
-    public void onError(Throwable error) throws IOException {
+    public void onError(Throwable error) {
         error.printStackTrace();
-        thread.isInterrupted();
-        if(session.isOpen()){
-            session.close();
+        if (thread != null && thread.isAlive()) {
+            thread.interrupt();
         }
+        threadMap.remove(session.getId());
     }
-}
\ No newline at end of file
+}

--
Gitblit v1.9.1