package com.whyc.controller; import com.alibaba.fastjson2.JSON; import com.whyc.dto.Response; import com.whyc.dto.interfaceB.DataB; import com.whyc.dto.interfaceC.DataC; import com.whyc.dto.interfaceC.PointC; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.common.KafkaFuture; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static java.lang.Thread.sleep; @RestController @RequestMapping("dcim_north") public class DcimNorthController{ @Resource private KafkaTemplate template; @Resource private KafkaTemplate> batchTemplate; /** * 这个仅测试使用,正式环境不使用 * @param dataB * @return */ @PostMapping("upload_config") public Response uploadConfig(@RequestBody DataB dataB){ //不处理 return new Response().set(1,"配置上传完成"); } @GetMapping("benchmarkTest") public void benchmarkTest(){ long currentTimeMillis = System.currentTimeMillis(); //生产80万条数据,然后发送 List dataCList = new LinkedList<>(); for (int i = 0; i < 8000; i++) { DataC dataC = new DataC(); dataC.setDevice_id(i+""); dataC.setDevice_type("test"); dataC.setIdc("test"); List pointCList = new LinkedList<>(); PointC pointC = new PointC(); pointC.setPoint_code(i); pointC.setTag("test"); pointC.setTimestamp(currentTimeMillis); pointC.setValue(Double.parseDouble(i+"")); pointCList.add(pointC); dataC.setPoints(pointCList); String dataCStr = JSON.toJSONString(dataC); dataCList.add(dataCStr); } //long startTime = System.currentTimeMillis(); while (true) { for (int i = 0; i < dataCList.size(); i++) { String dataCStr = dataCList.get(i); template.send("topic_device_data2", dataCStr); /*.addCallback( success->{ //System.out.println(success); }, failure->{ //System.err.println(failure.getMessage()); //failure.printStackTrace(); });*/ } long endTime = System.currentTimeMillis(); //System.out.println("花费时长(毫秒):" + (endTime - startTime)); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } @GetMapping("benchmarkTest3") public void benchmarkTest3(){ long currentTimeMillis = System.currentTimeMillis(); //生产80万条数据,然后发送 List dataCList = new LinkedList<>(); for (int i = 0; i < 800000; i++) { DataC dataC = new DataC(); dataC.setDevice_id(i+""); dataC.setDevice_type("test"); dataC.setIdc("test"); List pointCList = new LinkedList<>(); PointC pointC = new PointC(); pointC.setPoint_code(i); pointC.setTag("test"); pointC.setTimestamp(currentTimeMillis); pointC.setValue(Double.parseDouble(i+"")); pointCList.add(pointC); dataC.setPoints(pointCList); dataCList.add(dataC); } long startTime = System.currentTimeMillis(); batchTemplate.send("topic_device_data2",dataCList); /*for (int i = 0; i < dataCList.size(); i++) { pool.execute(()->{ template.send("topic_device_data2",dataCStr).addCallback( success->{ //System.out.println(success); }, failure->{ //System.err.println(failure.getMessage()); //failure.printStackTrace(); }); }); }*/ long endTime = System.currentTimeMillis(); System.out.println("花费时长(毫秒):"+(endTime-startTime)); } @PostMapping("push_data") public Response pushData(@RequestBody DataC dataC){ //点位数据发送到kafka topic_device_data String dataCStr = JSON.toJSONString(dataC); template.send("topic_device_data2",dataCStr).addCallback( success->{ System.out.println(success); }, failure->{ System.err.println(failure.getMessage()); failure.printStackTrace(); }); return new Response().set(1,"推送完成"); } }