From 77a48167b508f4aef5816251f718374cd0b9d52a Mon Sep 17 00:00:00 2001 From: xs Date: Fri, 21 Mar 2025 17:39:01 +0800 Subject: [PATCH] =?UTF-8?q?1.2.9=EF=BC=9A=20=E6=97=B6=E5=BA=8F=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93tsdb=E6=94=AF=E6=8C=811.x=E5=92=8C2.x?= =?UTF-8?q?=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-modules/hwmom-tsdb/pom.xml | 5 ++ .../tsdb/component/InfluxDbClient.java | 30 +++++++ .../component/impl/InfluxDb1xClientImpl.java | 86 ++++++++++++++++++ .../component/impl/InfluxDb2xClientImpl.java | 88 +++++++++++++++++++ .../dromara/tsdb/config/InfluxDbConfig.java | 33 +++---- .../tsdb/controller/InfluxDbController.java | 10 ++- .../tsdb/service/IInfluxDbService.java | 17 ++-- .../service/impl/InfluxDbServiceImpl.java | 59 ++++++++----- 8 files changed, 275 insertions(+), 53 deletions(-) create mode 100644 ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java create mode 100644 ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java create mode 100644 ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java diff --git a/ruoyi-modules/hwmom-tsdb/pom.xml b/ruoyi-modules/hwmom-tsdb/pom.xml index fd706974..0f719192 100644 --- a/ruoyi-modules/hwmom-tsdb/pom.xml +++ b/ruoyi-modules/hwmom-tsdb/pom.xml @@ -38,6 +38,11 @@ 2.23 + + com.influxdb + influxdb-client-java + 6.12.0 + diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java new file mode 100644 index 00000000..1b8987e1 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/InfluxDbClient.java @@ -0,0 +1,30 @@ +package org.dromara.tsdb.component; + +import java.util.List; +import java.util.Map; + +public interface InfluxDbClient { + /** + * 插入数据 + * @param measurement + * @param tags + * @param fields + */ + void writeData(String measurement, Map tags, Map fields); + + /** + * 插入数据 + * @param measurement + * @param tags + * @param fields + * @param timestamp + */ + void writeData(String measurement, Map tags, Map fields, long timestamp); + + /** + * 获取数据 + * @param query + * @return + */ + List> queryData(String query); +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java new file mode 100644 index 00000000..97e08a99 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb1xClientImpl.java @@ -0,0 +1,86 @@ +package org.dromara.tsdb.component.impl; + +import com.influxdb.client.domain.WritePrecision; +import org.dromara.tsdb.component.InfluxDbClient; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Component +public class InfluxDb1xClientImpl implements InfluxDbClient { + + private final InfluxDB influxDB; + + public InfluxDb1xClientImpl( + @Value("${influxdb.url}") String url, + @Value("${influxdb.username}") String username, + @Value("${influxdb.password}") String password, + @Value("${influxdb.database}") String database) { + this.influxDB = InfluxDBFactory.connect(url, username, password); + this.influxDB.setDatabase(database); + } + + @Override + public void writeData(String measurement, Map tags, Map fields) { + Point.Builder pointBuilder = Point.measurement(measurement); + tags.forEach(pointBuilder::tag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + pointBuilder.addField(key, (Number) value); + } else if (value instanceof String) { + pointBuilder.addField(key, (String) value); + } else if (value instanceof Boolean) { + pointBuilder.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + influxDB.write(pointBuilder.build()); + } + + @Override + public void writeData(String measurement, Map tags, Map fields, long timestamp) { + Point.Builder pointBuilder = Point.measurement(measurement).time(timestamp, TimeUnit.MILLISECONDS); + tags.forEach(pointBuilder::tag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + pointBuilder.addField(key, (Number) value); + } else if (value instanceof String) { + pointBuilder.addField(key, (String) value); + } else if (value instanceof Boolean) { + pointBuilder.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + influxDB.write(pointBuilder.build()); + } + + + @Override + public List> queryData(String query) { + QueryResult result = influxDB.query(new Query(query)); + return result.getResults().stream() + .flatMap(r -> r.getSeries().stream()) + .flatMap(s -> s.getValues().stream()) + .map(values -> { + Map map = new HashMap<>(); + for (int i = 0; i < values.size(); i++) { + System.out.println("values: " + values.get(i)); + } + return map; + }) + .toList(); + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java new file mode 100644 index 00000000..e4459045 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/component/impl/InfluxDb2xClientImpl.java @@ -0,0 +1,88 @@ +package org.dromara.tsdb.component.impl; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import org.dromara.tsdb.component.InfluxDbClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class InfluxDb2xClientImpl implements InfluxDbClient { + + private final InfluxDBClient influxDBClient; + private final String bucket; + + public InfluxDb2xClientImpl( + @Value("${influxdb.url}") String url, + @Value("${influxdb.token}") String token, + @Value("${influxdb.org}") String org, + @Value("${influxdb.bucket}") String bucket) { + this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket); + this.bucket = bucket; + } + + @Override + public void writeData(String measurement, Map tags, Map fields) { + try (WriteApi writeApi = influxDBClient.getWriteApi()) { + Point point = Point.measurement(measurement); + tags.forEach(point::addTag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + point.addField(key, (Number) value); + } else if (value instanceof String) { + point.addField(key, (String) value); + } else if (value instanceof Boolean) { + point.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + writeApi.writePoint(point); + } + } + + @Override + public void writeData(String measurement, Map tags, Map fields,long timestamp) { + try (WriteApi writeApi = influxDBClient.getWriteApi()) { + Point point = Point.measurement(measurement) + .time(timestamp,WritePrecision.MS);//设置时间戳 + tags.forEach(point::addTag); + // 处理 fields,确保类型正确 + fields.forEach((key, value) -> { + if (value instanceof Number) { + point.addField(key, (Number) value); + } else if (value instanceof String) { + point.addField(key, (String) value); + } else if (value instanceof Boolean) { + point.addField(key, (Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName()); + } + }); + writeApi.writePoint(point); + } + } + + @Override + public List> queryData(String query) { + List tables = influxDBClient.getQueryApi().query(query); + return tables.stream() + .flatMap(table -> table.getRecords().stream()) + .map(record -> { + Map map = new HashMap<>(); + record.getValues().forEach((key, value) -> map.put(key, value)); + return map; + }) + .toList(); + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java index 2e5e7f27..72dd8112 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java @@ -1,7 +1,8 @@ package org.dromara.tsdb.config; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; +import org.dromara.tsdb.component.InfluxDbClient; +import org.dromara.tsdb.component.impl.InfluxDb1xClientImpl; +import org.dromara.tsdb.component.impl.InfluxDb2xClientImpl; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,22 +10,22 @@ import org.springframework.context.annotation.Configuration; @Configuration public class InfluxDbConfig { - @Value("${influxdb.url}") - private String influxDBUrl; + @Value("${influxdb.version}") + private String version; - @Value("${influxdb.username}") - private String username; - - @Value("${influxdb.password}") - private String password; - - @Value("${influxdb.database}") - private String database; + @Value("${influxdb.bucket}") + private String bucket; @Bean - public InfluxDB influxDB() { - InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, username, password); - influxDB.setDatabase(database); - return influxDB; + public InfluxDbClient influxDbClient( + InfluxDb1xClientImpl influxDb1xClient, + InfluxDb2xClientImpl influxDb2xClient) { + if ("1.x".equals(version)) { + return influxDb1xClient; + } else if ("2.x".equals(version)) { + return influxDb2xClient; + } else { + throw new IllegalArgumentException("Unsupported InfluxDB version: " + version); + } } } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java index 7d5c7b9d..95422adf 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java @@ -6,6 +6,9 @@ import org.influxdb.dto.QueryResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.util.List; +import java.util.Map; + @RestController @RequestMapping("/influx") public class InfluxDbController { @@ -18,7 +21,7 @@ public class InfluxDbController { */ @PostMapping("/write") public String writeData(@RequestBody InfluxMeasurementBo influxMeasurementBo) { - influxDbService.writeData(influxMeasurementBo); + influxDbService.writeData(); return "Data written to InfluxDB successfully!"; } @@ -26,7 +29,8 @@ public class InfluxDbController { * 查询数据 */ @GetMapping("/query") - public QueryResult queryData(@RequestParam String query) { - return influxDbService.queryData(query); + public List> queryData(@RequestParam String query) { + + return influxDbService.queryData(); } } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java index ecd2b989..fc70b233 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java @@ -3,19 +3,12 @@ package org.dromara.tsdb.service; import org.dromara.tsdb.domain.bo.InfluxMeasurementBo; import org.influxdb.dto.QueryResult; +import java.util.List; +import java.util.Map; + public interface IInfluxDbService { - /** - * 写入数据到InfluxDB - * @param influxMeasurementBo - */ - public void writeData(InfluxMeasurementBo influxMeasurementBo); + public void writeData(); - /** - * 查询数据 - * - * @param query 查询语句 - * @return 查询结果 - */ - public QueryResult queryData(String query); + public List> queryData(); } diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java index de198af2..f2d625a9 100644 --- a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.tsdb.service.impl; +import org.dromara.tsdb.component.InfluxDbClient; import org.dromara.tsdb.domain.bo.InfluxMeasurementBo; import org.dromara.tsdb.service.IInfluxDbService; import org.influxdb.InfluxDB; @@ -9,38 +10,52 @@ import org.influxdb.dto.QueryResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @Service public class InfluxDbServiceImpl implements IInfluxDbService { @Autowired - private InfluxDB influxDB; + private InfluxDbClient influxDbClient; - /** - * 写入数据到InfluxDB - * @param influxMeasurementBo - */ @Override - public void writeData(InfluxMeasurementBo influxMeasurementBo) { - Point point = Point.measurement(influxMeasurementBo.getMeasurement()) - .tag(influxMeasurementBo.getTagKey(), influxMeasurementBo.getTagValue()) - .addField(influxMeasurementBo.getFieldKey(), influxMeasurementBo.getFieldValue()) - .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - .build(); - influxDB.write(point); + public void writeData() { + Map tags = Map.of("device_id", "device331"); + Map fields = Map.of("value", 33.6); + influxDbClient.writeData("temperature", tags, fields); } - /** - * 查询数据 - * - * @param query 查询语句 - * @return 查询结果 - */ @Override - public QueryResult queryData(String query) { - QueryResult queryResult = influxDB.query(new Query(query)); - System.out.println(queryResult.getResults()); - return queryResult; + public List> queryData() { + String query = "from(bucket: \"hwmom\") |> range(start: -7d) |> filter(fn: (r) => r._measurement == \"temperature\")"; + List> queryData= influxDbClient.queryData(query); + StringBuilder sb = new StringBuilder(); + int index = 1; + + for(Map map : queryData) { + if(map == null) { + sb.append("发现空Map对象\n\n"); + continue; + } + + sb.append("第").append(index++).append("条记录:\n"); + + for(Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + sb.append(" ").append(key) + .append(" = ") + .append(value != null ? value.toString() : "null") + .append("\n"); + } + sb.append("\n"); + } + + System.out.println(sb.toString()); + + return queryData; } }