时序数据库tsdb支持1.x和2.x版本
master
xs 5 days ago
parent bc05582caf
commit 77a48167b5

@ -38,6 +38,11 @@
<version>2.23</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.12.0</version> <!-- 请使用最新版本 -->
</dependency>
<!-- RuoYi Common Log -->
<dependency>

@ -0,0 +1,30 @@
package org.dromara.tsdb.component;
import java.util.List;
import java.util.Map;
public interface InfluxDbClient {
/**
*
* @param measurement
* @param tags
* @param fields
*/
void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields);
/**
*
* @param measurement
* @param tags
* @param fields
* @param timestamp
*/
void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields, long timestamp);
/**
*
* @param query
* @return
*/
List<Map<String, Object>> queryData(String query);
}

@ -0,0 +1,86 @@
package org.dromara.tsdb.component.impl;
import com.influxdb.client.domain.WritePrecision;
import org.dromara.tsdb.component.InfluxDbClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public class InfluxDb1xClientImpl implements InfluxDbClient {
private final InfluxDB influxDB;
public InfluxDb1xClientImpl(
@Value("${influxdb.url}") String url,
@Value("${influxdb.username}") String username,
@Value("${influxdb.password}") String password,
@Value("${influxdb.database}") String database) {
this.influxDB = InfluxDBFactory.connect(url, username, password);
this.influxDB.setDatabase(database);
}
@Override
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields) {
Point.Builder pointBuilder = Point.measurement(measurement);
tags.forEach(pointBuilder::tag);
// 处理 fields确保类型正确
fields.forEach((key, value) -> {
if (value instanceof Number) {
pointBuilder.addField(key, (Number) value);
} else if (value instanceof String) {
pointBuilder.addField(key, (String) value);
} else if (value instanceof Boolean) {
pointBuilder.addField(key, (Boolean) value);
} else {
throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName());
}
});
influxDB.write(pointBuilder.build());
}
@Override
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields, long timestamp) {
Point.Builder pointBuilder = Point.measurement(measurement).time(timestamp, TimeUnit.MILLISECONDS);
tags.forEach(pointBuilder::tag);
// 处理 fields确保类型正确
fields.forEach((key, value) -> {
if (value instanceof Number) {
pointBuilder.addField(key, (Number) value);
} else if (value instanceof String) {
pointBuilder.addField(key, (String) value);
} else if (value instanceof Boolean) {
pointBuilder.addField(key, (Boolean) value);
} else {
throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName());
}
});
influxDB.write(pointBuilder.build());
}
@Override
public List<Map<String, Object>> queryData(String query) {
QueryResult result = influxDB.query(new Query(query));
return result.getResults().stream()
.flatMap(r -> r.getSeries().stream())
.flatMap(s -> s.getValues().stream())
.map(values -> {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < values.size(); i++) {
System.out.println("values: " + values.get(i));
}
return map;
})
.toList();
}
}

@ -0,0 +1,88 @@
package org.dromara.tsdb.component.impl;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.dromara.tsdb.component.InfluxDbClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class InfluxDb2xClientImpl implements InfluxDbClient {
private final InfluxDBClient influxDBClient;
private final String bucket;
public InfluxDb2xClientImpl(
@Value("${influxdb.url}") String url,
@Value("${influxdb.token}") String token,
@Value("${influxdb.org}") String org,
@Value("${influxdb.bucket}") String bucket) {
this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
this.bucket = bucket;
}
@Override
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
Point point = Point.measurement(measurement);
tags.forEach(point::addTag);
// 处理 fields确保类型正确
fields.forEach((key, value) -> {
if (value instanceof Number) {
point.addField(key, (Number) value);
} else if (value instanceof String) {
point.addField(key, (String) value);
} else if (value instanceof Boolean) {
point.addField(key, (Boolean) value);
} else {
throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName());
}
});
writeApi.writePoint(point);
}
}
@Override
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields,long timestamp) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
Point point = Point.measurement(measurement)
.time(timestamp,WritePrecision.MS);//设置时间戳
tags.forEach(point::addTag);
// 处理 fields确保类型正确
fields.forEach((key, value) -> {
if (value instanceof Number) {
point.addField(key, (Number) value);
} else if (value instanceof String) {
point.addField(key, (String) value);
} else if (value instanceof Boolean) {
point.addField(key, (Boolean) value);
} else {
throw new IllegalArgumentException("Unsupported field type: " + value.getClass().getName());
}
});
writeApi.writePoint(point);
}
}
@Override
public List<Map<String, Object>> queryData(String query) {
List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
return tables.stream()
.flatMap(table -> table.getRecords().stream())
.map(record -> {
Map<String, Object> map = new HashMap<>();
record.getValues().forEach((key, value) -> map.put(key, value));
return map;
})
.toList();
}
}

@ -1,7 +1,8 @@
package org.dromara.tsdb.config;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.dromara.tsdb.component.InfluxDbClient;
import org.dromara.tsdb.component.impl.InfluxDb1xClientImpl;
import org.dromara.tsdb.component.impl.InfluxDb2xClientImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -9,22 +10,22 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDbConfig {
@Value("${influxdb.url}")
private String influxDBUrl;
@Value("${influxdb.version}")
private String version;
@Value("${influxdb.username}")
private String username;
@Value("${influxdb.password}")
private String password;
@Value("${influxdb.database}")
private String database;
@Value("${influxdb.bucket}")
private String bucket;
@Bean
public InfluxDB influxDB() {
InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, username, password);
influxDB.setDatabase(database);
return influxDB;
public InfluxDbClient influxDbClient(
InfluxDb1xClientImpl influxDb1xClient,
InfluxDb2xClientImpl influxDb2xClient) {
if ("1.x".equals(version)) {
return influxDb1xClient;
} else if ("2.x".equals(version)) {
return influxDb2xClient;
} else {
throw new IllegalArgumentException("Unsupported InfluxDB version: " + version);
}
}
}

@ -6,6 +6,9 @@ import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/influx")
public class InfluxDbController {
@ -18,7 +21,7 @@ public class InfluxDbController {
*/
@PostMapping("/write")
public String writeData(@RequestBody InfluxMeasurementBo influxMeasurementBo) {
influxDbService.writeData(influxMeasurementBo);
influxDbService.writeData();
return "Data written to InfluxDB successfully!";
}
@ -26,7 +29,8 @@ public class InfluxDbController {
*
*/
@GetMapping("/query")
public QueryResult queryData(@RequestParam String query) {
return influxDbService.queryData(query);
public List<Map<String, Object>> queryData(@RequestParam String query) {
return influxDbService.queryData();
}
}

@ -3,19 +3,12 @@ package org.dromara.tsdb.service;
import org.dromara.tsdb.domain.bo.InfluxMeasurementBo;
import org.influxdb.dto.QueryResult;
import java.util.List;
import java.util.Map;
public interface IInfluxDbService {
/**
* InfluxDB
* @param influxMeasurementBo
*/
public void writeData(InfluxMeasurementBo influxMeasurementBo);
public void writeData();
/**
*
*
* @param query
* @return
*/
public QueryResult queryData(String query);
public List<Map<String, Object>> queryData();
}

@ -1,5 +1,6 @@
package org.dromara.tsdb.service.impl;
import org.dromara.tsdb.component.InfluxDbClient;
import org.dromara.tsdb.domain.bo.InfluxMeasurementBo;
import org.dromara.tsdb.service.IInfluxDbService;
import org.influxdb.InfluxDB;
@ -9,38 +10,52 @@ import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Service
public class InfluxDbServiceImpl implements IInfluxDbService {
@Autowired
private InfluxDB influxDB;
private InfluxDbClient influxDbClient;
/**
* InfluxDB
* @param influxMeasurementBo
*/
@Override
public void writeData(InfluxMeasurementBo influxMeasurementBo) {
Point point = Point.measurement(influxMeasurementBo.getMeasurement())
.tag(influxMeasurementBo.getTagKey(), influxMeasurementBo.getTagValue())
.addField(influxMeasurementBo.getFieldKey(), influxMeasurementBo.getFieldValue())
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.build();
influxDB.write(point);
public void writeData() {
Map<String, String> tags = Map.of("device_id", "device331");
Map<String, Object> fields = Map.of("value", 33.6);
influxDbClient.writeData("temperature", tags, fields);
}
/**
*
*
* @param query
* @return
*/
@Override
public QueryResult queryData(String query) {
QueryResult queryResult = influxDB.query(new Query(query));
System.out.println(queryResult.getResults());
return queryResult;
public List<Map<String, Object>> queryData() {
String query = "from(bucket: \"hwmom\") |> range(start: -7d) |> filter(fn: (r) => r._measurement == \"temperature\")";
List<Map<String, Object>> queryData= influxDbClient.queryData(query);
StringBuilder sb = new StringBuilder();
int index = 1;
for(Map<String, Object> map : queryData) {
if(map == null) {
sb.append("发现空Map对象\n\n");
continue;
}
sb.append("第").append(index++).append("条记录:\n");
for(Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
sb.append(" ").append(key)
.append(" = ")
.append(value != null ? value.toString() : "null")
.append("\n");
}
sb.append("\n");
}
System.out.println(sb.toString());
return queryData;
}
}

Loading…
Cancel
Save