From 40436887f67d5ea4713908ff9dcbd07121b62d86 Mon Sep 17 00:00:00 2001 From: whyclj <1525436766@qq.com> Date: 星期二, 20 十月 2020 16:48:31 +0800 Subject: [PATCH] 修改 --- EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java | 129 +++++++++++++++++++++++++----------------- 1 files changed, 76 insertions(+), 53 deletions(-) diff --git a/EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java b/EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java index fba898a..5ac2c43 100644 --- a/EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java +++ b/EquieDeviceData/src/com/dev/data/Dev_UDPServer_Thread.java @@ -8,6 +8,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Date; +import java.util.Enumeration; import java.util.List; import com.base.BaseData; @@ -52,13 +53,17 @@ InetAddress heartgroup = null; DatagramPacket heartpack = null; MulticastSocket socket1 = null; + int heart_error_count = 0; //通讯错误计数 while(true) { try { heartgroup = InetAddress.getByName(server_ip); heartpack = new DatagramPacket(new byte[0], 0, heartgroup, port); // 目的端口和MulticastSocket端口一样!! socket1 = createMulticastGroupAndJoin(heartgroup,port); - boolean isNewThread = true; - String last = Com.getDateTimeFormat(new Date(2000,1,1), Com.DTF_Y_M_D); //上一次建表的ymd + socket1.setSoTimeout(4000);; + + heart_error_count = 0; + //boolean isNewThread = true; + //String last = Com.getDateTimeFormat(new Date(2000,1,1), Com.DTF_Y_M_D); //上一次建表的ymd while(true) { //发送心跳包 try { @@ -79,9 +84,18 @@ byte[] buf = BaseData.createHeartData(); heartpack.setData(buf); socket1.send(heartpack); - //System.err.println(Com.getDateTimeFormat(new Date(), Com.DTF_YMDhms)); +// System.out.println(socket1.getNetworkInterface().getInetAddresses()); +// Enumeration<InetAddress> ips = socket1.getNetworkInterface().getInetAddresses(); +// while(ips.hasMoreElements()) { +// System.out.println(ips.nextElement().getHostAddress()); +// } + //System.err.println("发送心跳包"+socket1.isConnected()+"==="+socket1.isClosed()+"=="+socket1.isBound()+Com.getDateTimeFormat(new Date(), Com.DTF_YMDhms)); Thread.sleep(5000); + if(heart_error_count >10) { + break; + } } catch (Exception e) { + heart_error_count++; try { Thread.sleep(1000); } catch (InterruptedException e1) { @@ -112,64 +126,73 @@ }).start(); byte[] data = null; DatagramPacket packet= null; + int conn_error_count = 0; while(true) { try { data = new byte[MAX_DATA_COUNT]; group = InetAddress.getByName(server_ip); socket = createMulticastGroupAndJoin(group,port); //加入组播组,设置组播组的监听端口为6556 + socket.setSoTimeout(5000); DatagramPacket outpack = new DatagramPacket(new byte[0], 0, group, port); // 目的端口和MulticastSocket端口一样!! packet = new DatagramPacket(data, data.length); - - - while(true) { - socket.receive(packet); // 通过MulticastSocket实例端口从组播组接收数据 - byte[] headCount = new byte[2]; - System.arraycopy(data, 0, headCount, 0, headCount.length); - //System.out.println("数据包长度"+createPackHeadCount(data)); - int dataCount = BaseData.createPackHeadCount(headCount); - if(dataCount > data.length) { - continue; //数据量超标数据帧 - } - ByteBuffer bf = ByteBuffer.allocate(dataCount); - bf.order(ByteOrder.LITTLE_ENDIAN); - bf.put(data,0,dataCount); - bf.flip(); - - //System.out.println(ComBase.calchecksum(bf.array())); - //System.out.println("接收到数据:"+ComFn.bytesToHexString(bf.array(), bf.array().length)); - - - BaseData baseData = new BaseData(); - if(baseData.putByteBuffer(bf,cfg.isEquie_device_udppackage_cheak())) { - //System.err.println("packtype:"+baseData.packtype); - if(BaseData.ClildStood_DevType == baseData.packtype) { - //子站类型数据包 - if(baseData.devident >= 0x0701 && baseData.devident <= 0x0707) { - String client_ip = packet.getSocketAddress().toString(); - RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_ChildStood, client_ip, bf.array()); - - ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE); - new Thread(childthread).start(); - } - //ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE); - //new Thread(childthread).start(); - thread_count ++; - }else if(BaseData.Battery_DevType == baseData.packtype) { - //蓄电池组类型数据包 - String client_ip = packet.getSocketAddress().toString(); - RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_Battery, client_ip, bf.array()); - - BatteryParse_Thread battthread = new BatteryParse_Thread(conn_pool, devices, baseData); - new Thread(battthread).start(); - thread_count ++; - }else if(BaseData.SwitchBoard_DevType == baseData.packtype) { - String client_ip = packet.getSocketAddress().toString(); - RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_SwitchBoard, client_ip, bf.array(),baseData.getDateFromDate()); - //开关量板类型数据包 - SwitchBoardParse_Thread switchthread = new SwitchBoardParse_Thread(conn_pool, devices, baseData); - new Thread(switchthread).start(); - thread_count ++; + conn_error_count = 0; + while(true) { + try { + socket.receive(packet); // 通过MulticastSocket实例端口从组播组接收数据 + byte[] headCount = new byte[2]; + System.arraycopy(data, 0, headCount, 0, headCount.length); + //System.out.println("数据包长度"+createPackHeadCount(data)); + int dataCount = BaseData.createPackHeadCount(headCount); + if(dataCount > data.length) { + continue; //数据量超标数据帧 } + ByteBuffer bf = ByteBuffer.allocate(dataCount); + bf.order(ByteOrder.LITTLE_ENDIAN); + bf.put(data,0,dataCount); + bf.flip(); + + //System.out.println(ComBase.calchecksum(bf.array())); + //System.out.println("接收到数据:"+ComFn.bytesToHexString(bf.array(), bf.array().length)); + + + BaseData baseData = new BaseData(); + if(baseData.putByteBuffer(bf,cfg.isEquie_device_udppackage_cheak())) { + //System.err.println("packtype:"+baseData.packtype); + if(BaseData.ClildStood_DevType == baseData.packtype) { + //子站类型数据包 + if(baseData.devident >= 0x0701 && baseData.devident <= 0x0707) { + String client_ip = packet.getSocketAddress().toString(); + RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_ChildStood, client_ip, bf.array()); + + ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE); + new Thread(childthread).start(); + } + //ChildStoodParse_Thread childthread = new ChildStoodParse_Thread(conn_pool, devices,configs, baseData,ZIZHAN_DATATYPE); + //new Thread(childthread).start(); + thread_count ++; + }else if(BaseData.Battery_DevType == baseData.packtype) { + //蓄电池组类型数据包 + String client_ip = packet.getSocketAddress().toString(); + RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_Battery, client_ip, bf.array()); + + BatteryParse_Thread battthread = new BatteryParse_Thread(conn_pool, devices, baseData); + new Thread(battthread).start(); + thread_count ++; + }else if(BaseData.SwitchBoard_DevType == baseData.packtype) { + String client_ip = packet.getSocketAddress().toString(); + RecordUDPPackage.RecordUDPPackageData(RecordUDPPackage.PackageType_SwitchBoard, client_ip, bf.array(),baseData.getDateFromDate()); + //开关量板类型数据包 + SwitchBoardParse_Thread switchthread = new SwitchBoardParse_Thread(conn_pool, devices, baseData); + new Thread(switchthread).start(); + thread_count ++; + } + } + } catch (Exception e) { + conn_error_count ++; + if(conn_error_count >6) { + break; + } + e.printStackTrace(); } //测试程序正常运行异常情况下异常情况 //System.out.println(1/0); -- Gitblit v1.9.1