whycxzp
2023-05-11 b0c4d3f59557206ee88ae5af43f4c835e0060149
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.whyc.controller;
 
import com.alibaba.fastjson2.JSON;
import com.whyc.dto.Response;
import com.whyc.dto.interfaceB.DataB;
import com.whyc.dto.interfaceC.DataC;
import com.whyc.dto.interfaceC.PointC;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
 
import javax.annotation.Resource;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
import static java.lang.Thread.sleep;
 
@RestController
@RequestMapping("dcim_north")
public class DcimNorthController{
 
    @Resource
    private KafkaTemplate template;
 
    @Resource
    private KafkaTemplate<Object,List<DataC>> batchTemplate;
 
    /**
     * 这个仅测试使用,正式环境不使用
     * @param dataB
     * @return
     */
    @PostMapping("upload_config")
    public Response uploadConfig(@RequestBody DataB dataB){
        //不处理
        return new Response().set(1,"配置上传完成");
    }
 
    @GetMapping("benchmarkTest")
    public void benchmarkTest(){
        long currentTimeMillis = System.currentTimeMillis();
        //生产80万条数据,然后发送
        List<String> dataCList = new LinkedList<>();
        for (int i = 0; i < 8000; i++) {
            DataC dataC = new DataC();
            dataC.setDevice_id(i+"");
            dataC.setDevice_type("test");
            dataC.setIdc("test");
 
            List<PointC> pointCList = new LinkedList<>();
            PointC pointC = new PointC();
            pointC.setPoint_code(i);
            pointC.setTag("test");
            pointC.setTimestamp(currentTimeMillis);
            pointC.setValue(Double.parseDouble(i+""));
            pointCList.add(pointC);
            dataC.setPoints(pointCList);
 
            String dataCStr = JSON.toJSONString(dataC);
            dataCList.add(dataCStr);
        }
        //long startTime = System.currentTimeMillis();
        while (true) {
            for (int i = 0; i < dataCList.size(); i++) {
                String dataCStr = dataCList.get(i);
                template.send("topic_device_data2", dataCStr);
                /*.addCallback(
                        success->{
                            //System.out.println(success);
                        },
                        failure->{
                            //System.err.println(failure.getMessage());
                            //failure.printStackTrace();
                        });*/
            }
            long endTime = System.currentTimeMillis();
            //System.out.println("花费时长(毫秒):" + (endTime - startTime));
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
    }
 
    @GetMapping("benchmarkTest3")
    public void benchmarkTest3(){
        long currentTimeMillis = System.currentTimeMillis();
        //生产80万条数据,然后发送
        List<DataC> dataCList = new LinkedList<>();
        for (int i = 0; i < 800000; i++) {
            DataC dataC = new DataC();
            dataC.setDevice_id(i+"");
            dataC.setDevice_type("test");
            dataC.setIdc("test");
 
            List<PointC> pointCList = new LinkedList<>();
            PointC pointC = new PointC();
            pointC.setPoint_code(i);
            pointC.setTag("test");
            pointC.setTimestamp(currentTimeMillis);
            pointC.setValue(Double.parseDouble(i+""));
            pointCList.add(pointC);
            dataC.setPoints(pointCList);
 
            dataCList.add(dataC);
        }
        long startTime = System.currentTimeMillis();
        batchTemplate.send("topic_device_data2",dataCList);
        /*for (int i = 0; i < dataCList.size(); i++) {
            pool.execute(()->{
                template.send("topic_device_data2",dataCStr).addCallback(
                        success->{
                            //System.out.println(success);
                        },
                        failure->{
                            //System.err.println(failure.getMessage());
                            //failure.printStackTrace();
                        });
            });
        }*/
        long endTime = System.currentTimeMillis();
        System.out.println("花费时长(毫秒):"+(endTime-startTime));
 
    }
 
    @PostMapping("push_data")
    public Response pushData(@RequestBody DataC dataC){
        //点位数据发送到kafka topic_device_data
        String dataCStr = JSON.toJSONString(dataC);
        template.send("topic_device_data2",dataCStr).addCallback(
                success->{
                    System.out.println(success);
                },
                failure->{
                    System.err.println(failure.getMessage());
                    failure.printStackTrace();
                });
        return new Response().set(1,"推送完成");
    }
 
}