pom.xml
@@ -42,6 +42,35 @@ <version>2.0.29</version> </dependency> <!-- 引入mongodb--> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency>--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <!--<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency>--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> src/main/java/com/whyc/config/KafkaConfig.java
@@ -14,7 +14,7 @@ @Bean public NewTopic batchTopic() { //return new NewTopic("topic_device_data",3,(short)3); return new NewTopic("topic_device_data2",1,(short)1); return new NewTopic("topic_device_data2",3,(short)1); } //@KafkaListener(topics = "topic_device_data") src/main/java/com/whyc/config/MybatisPlusConfig.java
New file @@ -0,0 +1,22 @@ package com.whyc.config; import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor; import com.whyc.injector.CustomSqlInjector; import org.mybatis.spring.annotation.MapperScan; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @MapperScan("com.whyc.mapper") public class MybatisPlusConfig { @Bean public CustomSqlInjector customSqlInjector(){ return new CustomSqlInjector(); } @Bean public PaginationInterceptor paginationInterceptor(){ return new PaginationInterceptor(); } } src/main/java/com/whyc/controller/DcimNorthController.java
@@ -5,19 +5,16 @@ import com.whyc.dto.interfaceB.DataB; import com.whyc.dto.interfaceC.DataC; import com.whyc.dto.interfaceC.PointC; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.common.KafkaFuture; import org.springframework.kafka.core.KafkaAdmin; import com.whyc.pojo.InterfaceCData; import com.whyc.service.ConfigService; import com.whyc.service.DataService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static java.lang.Thread.sleep; @@ -28,8 +25,11 @@ @Resource private KafkaTemplate template; @Resource private KafkaTemplate<Object,List<DataC>> batchTemplate; @Autowired private ConfigService configService; @Autowired private DataService dataService; /** * 这个仅测试使用,正式环境不使用 @@ -38,8 +38,25 @@ */ @PostMapping("upload_config") public Response uploadConfig(@RequestBody DataB dataB){ //不处理 //发送到数据库 configService.add(dataB); return new Response().set(1,"配置上传完成"); } @PostMapping("push_data") public Response pushData(@RequestBody InterfaceCData data){ //发送到数据库 dataService.add(data); //发送到kafka String jsonStr = JSON.toJSONString(data); template.send("topic_device_data2",jsonStr); return new Response().set(1,"推送完成"); } @GetMapping("dataList") public Response getDataList(){ //发送到数据库 return dataService.getDataList(); } @GetMapping("benchmarkTest") @@ -90,60 +107,5 @@ } @GetMapping("benchmarkTest3") public void benchmarkTest3(){ long currentTimeMillis = System.currentTimeMillis(); //生产80万条数据,然后发送 List<DataC> dataCList = new LinkedList<>(); for (int i = 0; i < 800000; i++) { DataC dataC = new DataC(); dataC.setDevice_id(i+""); dataC.setDevice_type("test"); dataC.setIdc("test"); List<PointC> pointCList = new LinkedList<>(); PointC pointC = new PointC(); pointC.setPoint_code(i); pointC.setTag("test"); pointC.setTimestamp(currentTimeMillis); pointC.setValue(Double.parseDouble(i+"")); pointCList.add(pointC); dataC.setPoints(pointCList); dataCList.add(dataC); } long startTime = System.currentTimeMillis(); batchTemplate.send("topic_device_data2",dataCList); /*for (int i = 0; i < dataCList.size(); i++) { pool.execute(()->{ template.send("topic_device_data2",dataCStr).addCallback( success->{ //System.out.println(success); }, failure->{ //System.err.println(failure.getMessage()); //failure.printStackTrace(); }); }); }*/ long endTime = System.currentTimeMillis(); System.out.println("花费时长(毫秒):"+(endTime-startTime)); } @PostMapping("push_data") public Response pushData(@RequestBody DataC dataC){ //点位数据发送到kafka topic_device_data String dataCStr = JSON.toJSONString(dataC); template.send("topic_device_data2",dataCStr).addCallback( success->{ System.out.println(success); }, failure->{ System.err.println(failure.getMessage()); failure.printStackTrace(); }); return new Response().set(1,"推送完成"); } } src/main/java/com/whyc/dto/interfaceB/DataB.java
@@ -1,24 +1,27 @@ package com.whyc.dto.interfaceB; import com.whyc.pojo.InterfaceBDeviceConfig; import com.whyc.pojo.InterfaceBMode; import java.util.List; public class DataB { private Mode mode; private List<Device> devices; private InterfaceBMode mode; private List<InterfaceBDeviceConfig> devices; public Mode getMode() { public InterfaceBMode getMode() { return mode; } public void setMode(Mode mode) { public void setMode(InterfaceBMode mode) { this.mode = mode; } public List<Device> getDevices() { public List<InterfaceBDeviceConfig> getDevices() { return devices; } public void setDevices(List<Device> devices) { public void setDevices(List<InterfaceBDeviceConfig> devices) { this.devices = devices; } } src/main/java/com/whyc/injector/CustomSqlInjector.java
New file @@ -0,0 +1,24 @@ package com.whyc.injector; import com.baomidou.mybatisplus.core.injector.AbstractMethod; import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; import com.baomidou.mybatisplus.extension.injector.methods.additional.AlwaysUpdateSomeColumnById; import com.baomidou.mybatisplus.extension.injector.methods.additional.InsertBatchSomeColumn; import java.util.List; /** * 自定义sql注入器,使得mybatis-plus能自动识别执行 */ public class CustomSqlInjector extends DefaultSqlInjector { @Override public List<AbstractMethod> getMethodList(Class<?> mapperClass) { //这是默认的父类方法列表 List<AbstractMethod> methodList = super.getMethodList(mapperClass); //新增批量插入方法 methodList.add(new InsertBatchSomeColumn()); methodList.add(new AlwaysUpdateSomeColumnById()); return methodList; } } src/main/java/com/whyc/mapper/ConfigMapper.java
New file @@ -0,0 +1,4 @@ package com.whyc.mapper; public interface ConfigMapper { } src/main/java/com/whyc/mapper/CustomMapper.java
New file @@ -0,0 +1,17 @@ package com.whyc.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import java.util.Collection; @SuppressWarnings("all") public interface CustomMapper<T> extends BaseMapper<T> { /** * 批量插入 * @param entityList * @return */ Integer insertBatchSomeColumn(Collection<T> entityList); } src/main/java/com/whyc/mapper/InterfaceBDeviceConfigMapper.java
New file @@ -0,0 +1,6 @@ package com.whyc.mapper; import com.whyc.pojo.InterfaceBDeviceConfig; public interface InterfaceBDeviceConfigMapper extends CustomMapper<InterfaceBDeviceConfig>{ } src/main/java/com/whyc/mapper/InterfaceBModeMapper.java
New file @@ -0,0 +1,7 @@ package com.whyc.mapper; import com.whyc.pojo.InterfaceBMode; public interface InterfaceBModeMapper extends CustomMapper<InterfaceBMode>{ } src/main/java/com/whyc/mapper/InterfaceBPointMapper.java
New file @@ -0,0 +1,6 @@ package com.whyc.mapper; import com.whyc.pojo.InterfaceBPoint; public interface InterfaceBPointMapper extends CustomMapper<InterfaceBPoint>{ } src/main/java/com/whyc/mapper/InterfaceCDataMapper.java
New file @@ -0,0 +1,11 @@ package com.whyc.mapper; import com.whyc.pojo.InterfaceCData; import java.util.List; public interface InterfaceCDataMapper extends CustomMapper<InterfaceCData> { List<InterfaceCData> getDataList(); } src/main/java/com/whyc/mapper/InterfaceCPointMapper.java
New file @@ -0,0 +1,6 @@ package com.whyc.mapper; import com.whyc.pojo.InterfaceCPoint; public interface InterfaceCPointMapper extends CustomMapper<InterfaceCPoint> { } src/main/java/com/whyc/pojo/InterfaceBDeviceConfig.java
New file @@ -0,0 +1,94 @@ package com.whyc.pojo; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; import java.util.List; @TableName(schema = "db_dh",value = "interface_b_device_config") public class InterfaceBDeviceConfig { private Long id; private String guid; private String tag; private String name; private String device_type; private String path; private Long mode_id; private Date create_time; @TableField(exist = false) private List<InterfaceBPoint> points; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getGuid() { return guid; } public void setGuid(String guid) { this.guid = guid; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDevice_type() { return device_type; } public void setDevice_type(String device_type) { this.device_type = device_type; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public Long getMode_id() { return mode_id; } public void setMode_id(Long mode_id) { this.mode_id = mode_id; } public List<InterfaceBPoint> getPoints() { return points; } public void setPoints(List<InterfaceBPoint> points) { this.points = points; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } } src/main/java/com/whyc/pojo/InterfaceBMode.java
New file @@ -0,0 +1,47 @@ package com.whyc.pojo; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; @TableName(schema = "db_dh",value = "interface_b_mode") public class InterfaceBMode { private Long id; private String transfer_mode; @TableField("interval_time") private Integer interval; private Date create_time; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getTransfer_mode() { return transfer_mode; } public void setTransfer_mode(String transfer_mode) { this.transfer_mode = transfer_mode; } public Integer getInterval() { return interval; } public void setInterval(Integer interval) { this.interval = interval; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } } src/main/java/com/whyc/pojo/InterfaceBPoint.java
New file @@ -0,0 +1,72 @@ package com.whyc.pojo; import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; @TableName(schema = "db_dh",value = "interface_b_point") public class InterfaceBPoint { private Long id; private String tag; private String name; private Integer point_type; private String unit; private Long device_config_id; private Date create_time; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getPoint_type() { return point_type; } public void setPoint_type(Integer point_type) { this.point_type = point_type; } public String getUnit() { return unit; } public void setUnit(String unit) { this.unit = unit; } public Long getDevice_config_id() { return device_config_id; } public void setDevice_config_id(Long device_config_id) { this.device_config_id = device_config_id; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } } src/main/java/com/whyc/pojo/InterfaceCData.java
New file @@ -0,0 +1,67 @@ package com.whyc.pojo; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; import java.util.List; @TableName(schema = "db_dh",value = "interface_c_data") public class InterfaceCData { private Long id; private String device_id; private String idc; private String device_type; private Date create_time; @TableField(exist = false) private List<InterfaceCPoint> points; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } public String getDevice_id() { return device_id; } public void setDevice_id(String device_id) { this.device_id = device_id; } public String getIdc() { return idc; } public void setIdc(String idc) { this.idc = idc; } public String getDevice_type() { return device_type; } public void setDevice_type(String device_type) { this.device_type = device_type; } public List<InterfaceCPoint> getPoints() { return points; } public void setPoints(List<InterfaceCPoint> points) { this.points = points; } } src/main/java/com/whyc/pojo/InterfaceCPoint.java
New file @@ -0,0 +1,72 @@ package com.whyc.pojo; import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; @TableName(schema = "db_dh",value = "interface_c_point") public class InterfaceCPoint { private Long id; private Integer point_code; private String tag; private Double value; private Long timestamp; private Long data_id; private Date create_time; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public Integer getPoint_code() { return point_code; } public void setPoint_code(Integer point_code) { this.point_code = point_code; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public Double getValue() { return value; } public void setValue(Double value) { this.value = value; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public Long getData_id() { return data_id; } public void setData_id(Long data_id) { this.data_id = data_id; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } } src/main/java/com/whyc/service/ConfigService.java
New file @@ -0,0 +1,48 @@ package com.whyc.service; import com.whyc.dto.interfaceB.DataB; import com.whyc.mapper.ConfigMapper; import com.whyc.mapper.InterfaceBDeviceConfigMapper; import com.whyc.mapper.InterfaceBModeMapper; import com.whyc.mapper.InterfaceBPointMapper; import com.whyc.pojo.InterfaceBDeviceConfig; import com.whyc.pojo.InterfaceBMode; import com.whyc.pojo.InterfaceBPoint; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; import java.util.List; @Service public class ConfigService { @Resource private ConfigMapper mapper; @Resource private InterfaceBModeMapper interfaceBModeMapper; @Resource private InterfaceBDeviceConfigMapper interfaceBDeviceConfigMapper; @Resource private InterfaceBPointMapper interfaceBPointMapper; public void add(DataB dataB) { Date now = new Date(); InterfaceBMode mode = dataB.getMode(); mode.setCreate_time(now); interfaceBModeMapper.insert(mode); List<InterfaceBDeviceConfig> devices = dataB.getDevices(); devices.stream().forEach(device->{device.setMode_id(mode.getId());device.setCreate_time(now);}); for (InterfaceBDeviceConfig device : devices) { interfaceBDeviceConfigMapper.insert(device); List<InterfaceBPoint> points = device.getPoints(); points.stream().forEach(point->{point.setDevice_config_id(device.getId());point.setCreate_time(now);}); interfaceBPointMapper.insertBatchSomeColumn(points); } } } src/main/java/com/whyc/service/DataService.java
New file @@ -0,0 +1,40 @@ package com.whyc.service; import com.whyc.dto.Response; import com.whyc.dto.interfaceC.DataC; import com.whyc.dto.interfaceC.PointC; import com.whyc.mapper.InterfaceCDataMapper; import com.whyc.mapper.InterfaceCPointMapper; import com.whyc.pojo.InterfaceCData; import com.whyc.pojo.InterfaceCPoint; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @Service public class DataService { @Resource private InterfaceCDataMapper interfaceCDataMapper; @Resource private InterfaceCPointMapper interfaceCPointMapper; public void add(InterfaceCData data) { Date now = new Date(); data.setCreate_time(now); interfaceCDataMapper.insert(data); List<InterfaceCPoint> points = data.getPoints(); points.stream().forEach(point->{point.setCreate_time(now);point.setData_id(data.getId());}); interfaceCPointMapper.insertBatchSomeColumn(points); } public Response getDataList() { List<InterfaceCData> dataList = interfaceCDataMapper.getDataList(); return new Response().set(1,dataList); } } src/main/resources/application-dev.yml
@@ -12,12 +12,24 @@ max-http-form-post-size: 102400000 message.max.bytes: 156466826 spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.10.79:3360/db_dh?characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowMultiQueries=true&rewriteBatchedStatements=true # url: jdbc:mysql://192.168.10.80:3360/db_user?characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowMultiQueries=true&rewriteBatchedStatements=true # url: jdbc:mysql://118.89.139.230:3360/db_user?characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowMultiQueries=true&rewriteBatchedStatements=true username: root password: lmx8688139 maxIdel: 60 initialPoolSize: 2 minPoolSize: 2 maxPoolSize: 500 kafka: # 指定kafka server的地址,集群配多个,中间,逗号隔开 bootstrap-servers: broker-2:9092 # broker-1:9092 # broker-2:9092 # broker-3:9092 bootstrap-servers: - broker-1:9092 - broker-2:9092 - broker-3:9092 # - 192.168.10.143:9092 # - 192.168.10.144:9092 # - 192.168.10.145:9092 @@ -62,4 +74,16 @@ auto-offset-reset: earliest #latest 最新的位置 , earliest最早的位置 auto-commit-interval: 100 #自动提交offset频率 100毫秒 knife: enable: true enable: true mybatis-plus: typeAliasesPackage: com.whyc.pojo,com.whyc.dto mapper-locations: classpath:mapper/*.xml global-config: db-config: #主键类型 0:"数据库ID自增", 1:"用户输入ID",2:"全局唯一ID (数字类型唯一ID)", 3:"全局唯一ID UUID"; id-type: AUTO #驼峰下划线转换 table-underline: true configuration: #配置返回数据库(column下划线命名&&返回java实体是驼峰命名),自动匹配无需as(没开启这个,SQL需要写as: select user_id as userId) map-underscore-to-camel-case: true src/main/resources/mapper/0Mapper.xml
New file @@ -0,0 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.whyc.mapper" > </mapper> src/main/resources/mapper/InterfaceCDataMapper.xml
New file @@ -0,0 +1,20 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.whyc.mapper.InterfaceCDataMapper" > <resultMap id="Map_InterfaceCData" type="com.whyc.pojo.InterfaceCData"> <result property="device_id" column="device_id"/> <result property="idc" column="idc"/> <result property="device_type" column="device_type"/> <collection property="points" ofType="com.whyc.pojo.InterfaceCPoint"> <result property="point_code" column="point_code"/> <result property="tag" column="tag"/> <result property="value" column="value"/> <result property="timestamp" column="timestamp"/> </collection> </resultMap> <select id="getDataList" resultMap="Map_InterfaceCData"> SELECT * FROM db_dh.interface_c_data d,db_dh.interface_c_point p where d.id = p.data_id </select> </mapper>