| | |
| | | import com.whyc.dto.interfaceB.DataB; |
| | | import com.whyc.dto.interfaceC.DataC; |
| | | import com.whyc.dto.interfaceC.PointC; |
| | | import org.apache.kafka.clients.admin.KafkaAdminClient; |
| | | import org.apache.kafka.common.KafkaFuture; |
| | | import org.springframework.kafka.core.KafkaAdmin; |
| | | import com.whyc.pojo.InterfaceCData; |
| | | import com.whyc.service.ConfigService; |
| | | import com.whyc.service.DataService; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.web.bind.annotation.*; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.LinkedBlockingDeque; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import static java.lang.Thread.sleep; |
| | | |
| | |
| | | @Resource |
| | | private KafkaTemplate template; |
| | | |
| | | @Resource |
| | | private KafkaTemplate<Object,List<DataC>> batchTemplate; |
| | | @Autowired |
| | | private ConfigService configService; |
| | | |
| | | @Autowired |
| | | private DataService dataService; |
| | | |
| | | /** |
| | | * 这个仅测试使用,正式环境不使用 |
| | |
| | | */ |
| | | @PostMapping("upload_config") |
| | | public Response uploadConfig(@RequestBody DataB dataB){ |
| | | //不处理 |
| | | //发送到数据库 |
| | | configService.add(dataB); |
| | | return new Response().set(1,"配置上传完成"); |
| | | } |
| | | |
| | | @PostMapping("push_data") |
| | | public Response pushData(@RequestBody InterfaceCData data){ |
| | | //发送到数据库 |
| | | dataService.add(data); |
| | | //发送到kafka |
| | | String jsonStr = JSON.toJSONString(data); |
| | | template.send("topic_device_data2",jsonStr); |
| | | return new Response().set(1,"推送完成"); |
| | | } |
| | | |
| | | @GetMapping("dataList") |
| | | public Response getDataList(){ |
| | | //发送到数据库 |
| | | return dataService.getDataList(); |
| | | } |
| | | |
| | | @GetMapping("benchmarkTest") |
| | |
| | | |
| | | } |
| | | |
| | | @GetMapping("benchmarkTest3") |
| | | public void benchmarkTest3(){ |
| | | long currentTimeMillis = System.currentTimeMillis(); |
| | | //生产80万条数据,然后发送 |
| | | List<DataC> dataCList = new LinkedList<>(); |
| | | for (int i = 0; i < 800000; i++) { |
| | | DataC dataC = new DataC(); |
| | | dataC.setDevice_id(i+""); |
| | | dataC.setDevice_type("test"); |
| | | dataC.setIdc("test"); |
| | | |
| | | List<PointC> pointCList = new LinkedList<>(); |
| | | PointC pointC = new PointC(); |
| | | pointC.setPoint_code(i); |
| | | pointC.setTag("test"); |
| | | pointC.setTimestamp(currentTimeMillis); |
| | | pointC.setValue(Double.parseDouble(i+"")); |
| | | pointCList.add(pointC); |
| | | dataC.setPoints(pointCList); |
| | | |
| | | dataCList.add(dataC); |
| | | } |
| | | long startTime = System.currentTimeMillis(); |
| | | batchTemplate.send("topic_device_data2",dataCList); |
| | | /*for (int i = 0; i < dataCList.size(); i++) { |
| | | pool.execute(()->{ |
| | | template.send("topic_device_data2",dataCStr).addCallback( |
| | | success->{ |
| | | //System.out.println(success); |
| | | }, |
| | | failure->{ |
| | | //System.err.println(failure.getMessage()); |
| | | //failure.printStackTrace(); |
| | | }); |
| | | }); |
| | | }*/ |
| | | long endTime = System.currentTimeMillis(); |
| | | System.out.println("花费时长(毫秒):"+(endTime-startTime)); |
| | | |
| | | } |
| | | |
| | | @PostMapping("push_data") |
| | | public Response pushData(@RequestBody DataC dataC){ |
| | | //点位数据发送到kafka topic_device_data |
| | | String dataCStr = JSON.toJSONString(dataC); |
| | | template.send("topic_device_data2",dataCStr).addCallback( |
| | | success->{ |
| | | System.out.println(success); |
| | | }, |
| | | failure->{ |
| | | System.err.println(failure.getMessage()); |
| | | failure.printStackTrace(); |
| | | }); |
| | | return new Response().set(1,"推送完成"); |
| | | } |
| | | |
| | | } |