whycxzp
2023-05-12 d3465e119e5795a3addee8e3d3ae0ecc8a99fcc4
src/main/java/com/whyc/controller/DcimNorthController.java
@@ -5,19 +5,16 @@
import com.whyc.dto.interfaceB.DataB;
import com.whyc.dto.interfaceC.DataC;
import com.whyc.dto.interfaceC.PointC;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.kafka.core.KafkaAdmin;
import com.whyc.pojo.InterfaceCData;
import com.whyc.service.ConfigService;
import com.whyc.service.DataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
@@ -28,8 +25,11 @@
    @Resource
    private KafkaTemplate template;
    @Resource
    private KafkaTemplate<Object,List<DataC>> batchTemplate;
    @Autowired
    private ConfigService configService;
    @Autowired
    private DataService dataService;
    /**
     * 这个仅测试使用,正式环境不使用
@@ -38,8 +38,25 @@
     */
    @PostMapping("upload_config")
    public Response uploadConfig(@RequestBody DataB dataB){
        //不处理
        //发送到数据库
        configService.add(dataB);
        return new Response().set(1,"配置上传完成");
    }
    @PostMapping("push_data")
    public Response pushData(@RequestBody InterfaceCData data){
        //发送到数据库
        dataService.add(data);
        //发送到kafka
        String jsonStr = JSON.toJSONString(data);
        template.send("topic_device_data2",jsonStr);
        return new Response().set(1,"推送完成");
    }
    @GetMapping("dataList")
    public Response getDataList(){
        //发送到数据库
        return dataService.getDataList();
    }
    @GetMapping("benchmarkTest")
@@ -90,60 +107,5 @@
    }
    @GetMapping("benchmarkTest3")
    public void benchmarkTest3(){
        long currentTimeMillis = System.currentTimeMillis();
        //生产80万条数据,然后发送
        List<DataC> dataCList = new LinkedList<>();
        for (int i = 0; i < 800000; i++) {
            DataC dataC = new DataC();
            dataC.setDevice_id(i+"");
            dataC.setDevice_type("test");
            dataC.setIdc("test");
            List<PointC> pointCList = new LinkedList<>();
            PointC pointC = new PointC();
            pointC.setPoint_code(i);
            pointC.setTag("test");
            pointC.setTimestamp(currentTimeMillis);
            pointC.setValue(Double.parseDouble(i+""));
            pointCList.add(pointC);
            dataC.setPoints(pointCList);
            dataCList.add(dataC);
        }
        long startTime = System.currentTimeMillis();
        batchTemplate.send("topic_device_data2",dataCList);
        /*for (int i = 0; i < dataCList.size(); i++) {
            pool.execute(()->{
                template.send("topic_device_data2",dataCStr).addCallback(
                        success->{
                            //System.out.println(success);
                        },
                        failure->{
                            //System.err.println(failure.getMessage());
                            //failure.printStackTrace();
                        });
            });
        }*/
        long endTime = System.currentTimeMillis();
        System.out.println("花费时长(毫秒):"+(endTime-startTime));
    }
    @PostMapping("push_data")
    public Response pushData(@RequestBody DataC dataC){
        //点位数据发送到kafka topic_device_data
        String dataCStr = JSON.toJSONString(dataC);
        template.send("topic_device_data2",dataCStr).addCallback(
                success->{
                    System.out.println(success);
                },
                failure->{
                    System.err.println(failure.getMessage());
                    failure.printStackTrace();
                });
        return new Response().set(1,"推送完成");
    }
}