whyclj
2020-10-23 291d27a34b52911dbcfd8232f0685d34b0fc96a8
EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java
@@ -4,10 +4,13 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Date;
import java.util.Enumeration;
import java.util.List;
import com.base.BaseData;
@@ -52,13 +55,17 @@
                 InetAddress heartgroup = null;
                 DatagramPacket heartpack = null;
                 MulticastSocket socket1 = null;
                 int heart_error_count = 0;               //通讯错误计数
                 while(true) {
                    try {
                     heartgroup = InetAddress.getByName(server_ip);
                     heartpack = new DatagramPacket(new byte[0], 0, heartgroup, port);             // 目的端口和MulticastSocket端口一样!!
                     socket1 = createMulticastGroupAndJoin(heartgroup,port);
                     boolean isNewThread = true;
                     String last = Com.getDateTimeFormat(new Date(2000,1,1), Com.DTF_Y_M_D);         //上一次建表的ymd
                     socket1.setSoTimeout(4000);;
                     heart_error_count = 0;
                     //boolean isNewThread = true;
                     //String last = Com.getDateTimeFormat(new Date(2000,1,1), Com.DTF_Y_M_D);         //上一次建表的ymd
                     while(true) {
                        //发送心跳包
                        try {
@@ -79,9 +86,18 @@
                           byte[] buf = BaseData.createHeartData();
                           heartpack.setData(buf);
                           socket1.send(heartpack);
                           //System.err.println(Com.getDateTimeFormat(new Date(), Com.DTF_YMDhms));
//                           System.out.println(socket1.getNetworkInterface().getInetAddresses());
//                           Enumeration<InetAddress> ips = socket1.getNetworkInterface().getInetAddresses();
//                           while(ips.hasMoreElements()) {
//                              System.out.println(ips.nextElement().getHostAddress());
//                           }
                           //System.err.println("发送心跳包"+socket1.isConnected()+"==="+socket1.isClosed()+"=="+socket1.isBound()+Com.getDateTimeFormat(new Date(), Com.DTF_YMDhms));
                           Thread.sleep(5000);
                           if(heart_error_count >10) {
                              break;
                           }
                        } catch (Exception e) {
                           heart_error_count++;
                           try {
                              Thread.sleep(1000);
                           } catch (InterruptedException e1) {
@@ -112,64 +128,73 @@
          }).start(); 
          byte[] data = null;
          DatagramPacket packet= null;
          int conn_error_count = 0;
          while(true) {
            try {
               data = new byte[MAX_DATA_COUNT];
               group = InetAddress.getByName(server_ip);       
               socket = createMulticastGroupAndJoin(group,port);  //加入组播组,设置组播组的监听端口为6556  
               socket.setSoTimeout(5000);
               DatagramPacket outpack = new DatagramPacket(new byte[0], 0, group, port); // 目的端口和MulticastSocket端口一样!!
               packet = new DatagramPacket(data, data.length);  
               while(true) {
                  socket.receive(packet);       // 通过MulticastSocket实例端口从组播组接收数据
                  byte[] headCount = new byte[2];
                  System.arraycopy(data, 0, headCount, 0, headCount.length);
                  //System.out.println("数据包长度"+createPackHeadCount(data));
                  int dataCount = BaseData.createPackHeadCount(headCount);
                  if(dataCount > data.length) {
                     continue;         //数据量超标数据帧
                  }
                  ByteBuffer bf = ByteBuffer.allocate(dataCount);
                  bf.order(ByteOrder.LITTLE_ENDIAN);
                  bf.put(data,0,dataCount);
                  bf.flip();
                  //System.out.println(ComBase.calchecksum(bf.array()));
                  //System.out.println("接收到数据:"+ComFn.bytesToHexString(bf.array(), bf.array().length));
                  BaseData baseData = new BaseData();
                  if(baseData.putByteBuffer(bf,cfg.isEquie_device_udppackage_cheak())) {
                     //System.err.println("packtype:"+baseData.packtype);
                     if(BaseData.ClildStood_DevType == baseData.packtype) {
                        //子站类型数据包
                        if(baseData.devident >= 0x0701 && baseData.devident <= 0x0707) {
                           String client_ip = packet.getSocketAddress().toString();
                           RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_ChildStood, client_ip, bf.array());
                           ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE);
                           new Thread(childthread).start();
                        }
                        //ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE);
                        //new Thread(childthread).start();
                        thread_count ++;
                     }else if(BaseData.Battery_DevType == baseData.packtype) {
                        //蓄电池组类型数据包
                        String client_ip = packet.getSocketAddress().toString();
                        RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_Battery, client_ip, bf.array());
                        BatteryParse_Thread battthread = new BatteryParse_Thread(conn_pool, devices, baseData);
                        new Thread(battthread).start();
                        thread_count ++;
                     }else if(BaseData.SwitchBoard_DevType == baseData.packtype) {
                        String client_ip = packet.getSocketAddress().toString();
                        RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_SwitchBoard, client_ip, bf.array(),baseData.getDateFromDate());
                        //开关量板类型数据包
                        SwitchBoardParse_Thread switchthread = new SwitchBoardParse_Thread(conn_pool, devices, baseData);
                        new Thread(switchthread).start();
                        thread_count ++;
               conn_error_count = 0;
               while(true) {
                  try {
                     socket.receive(packet);       // 通过MulticastSocket实例端口从组播组接收数据
                     byte[] headCount = new byte[2];
                     System.arraycopy(data, 0, headCount, 0, headCount.length);
                     //System.out.println("数据包长度"+createPackHeadCount(data));
                     int dataCount = BaseData.createPackHeadCount(headCount);
                     if(dataCount > data.length) {
                        continue;         //数据量超标数据帧
                     }
                     ByteBuffer bf = ByteBuffer.allocate(dataCount);
                     bf.order(ByteOrder.LITTLE_ENDIAN);
                     bf.put(data,0,dataCount);
                     bf.flip();
                     //System.out.println(ComBase.calchecksum(bf.array()));
                     //System.out.println("接收到数据:"+ComFn.bytesToHexString(bf.array(), bf.array().length));
                     BaseData baseData = new BaseData();
                     if(baseData.putByteBuffer(bf,cfg.isEquie_device_udppackage_cheak())) {
                        //System.err.println("packtype:"+baseData.packtype);
                        if(BaseData.ClildStood_DevType == baseData.packtype) {
                           //子站类型数据包
                           if(baseData.devident >= 0x0701 && baseData.devident <= 0x0707) {
                              String client_ip = packet.getSocketAddress().toString();
                              RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_ChildStood, client_ip, bf.array());
                              ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE);
                              new Thread(childthread).start();
                           }
                           //ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE);
                           //new Thread(childthread).start();
                           thread_count ++;
                        }else if(BaseData.Battery_DevType == baseData.packtype) {
                           //蓄电池组类型数据包
                           String client_ip = packet.getSocketAddress().toString();
                           RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_Battery, client_ip, bf.array());
                           BatteryParse_Thread battthread = new BatteryParse_Thread(conn_pool, devices, baseData);
                           new Thread(battthread).start();
                           thread_count ++;
                        }else if(BaseData.SwitchBoard_DevType == baseData.packtype) {
                           String client_ip = packet.getSocketAddress().toString();
                           RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_SwitchBoard, client_ip, bf.array(),baseData.getDateFromDate());
                           //开关量板类型数据包
                           SwitchBoardParse_Thread switchthread = new SwitchBoardParse_Thread(conn_pool, devices, baseData);
                           new Thread(switchthread).start();
                           thread_count ++;
                        }
                     }
                  } catch (Exception e) {
                     conn_error_count ++;
                     if(conn_error_count >=6) {
                        break;
                     }
                     //e.printStackTrace();
                  }
                  //测试程序正常运行异常情况下异常情况
                  //System.out.println(1/0);
@@ -217,6 +242,8 @@
   private MulticastSocket createMulticastGroupAndJoin(InetAddress group,int port) {
      try {  
           MulticastSocket socket = new MulticastSocket(port); // 初始化MulticastSocket类并将端口号与之关联  
           //InetAddress bind = new InetSocketAddress("192.0.0.74", port);
           //MulticastSocket socket = new MulticastSocket(bind); // 初始化MulticastSocket类并将端口号与之关联
           socket.setLoopbackMode(false);
           socket.joinGroup(group); // 加入此组播组  
           return socket;