Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 159 additions & 109 deletions iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.BytesUtils;
Expand Down Expand Up @@ -601,7 +599,7 @@ private static void dumpResult(String sql, int index) {
List<String> names = sessionDataSet.getColumnNames();
List<String> types = sessionDataSet.getColumnTypes();
if (EXPORT_SQL_TYPE_NAME.equalsIgnoreCase(exportType)) {
writeSqlFile(sessionDataSet, path, names, linesPerFile);
writeSqlFile(sessionDataSet, path, names);
} else {
if (Boolean.TRUE.equals(needDataTypePrinted)) {
for (int i = 0; i < names.size(); i++) {
Expand Down Expand Up @@ -707,128 +705,180 @@ public static void writeCsvFile(
}
}

public static void writeSqlFile(
SessionDataSet sessionDataSet, String filePath, List<String> headers, int linesPerFile)
private static void exportToSqlFileWithAlignDevice(
SessionDataSet sessionDataSet, String filePath, List<String> measurementNames)
throws IOException, IoTDBConnectionException, StatementExecutionException {

List<String> localMeasurementNames = new ArrayList<>(measurementNames);
if (CollectionUtils.isEmpty(localMeasurementNames) || localMeasurementNames.size() <= 1) {
return;
} else {
localMeasurementNames.remove("Time");
localMeasurementNames.remove("Device");
}
if (CollectionUtils.isEmpty(localMeasurementNames)) {
return;
}

String sqlPrefix =
String.format(
"INSERT INTO %s(TIMESTAMP,%s) ALIGNED VALUES (%s);\n",
"%s", String.join(",", localMeasurementNames), "%d,%s");

SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
List<String> columnTypeList = iterator.getColumnTypeList();
int totalColumns = columnTypeList.size();
String deviceName = null;
int fileIndex = 0;
int currentLines = 0;
String filePathTemplate = filePath + "_%d" + ".sql";
FileWriter writer = null;
try {
while (iterator.next()) {
if (writer == null) {
writer = new FileWriter(String.format(filePathTemplate, fileIndex));
}
deviceName = iterator.getString(2);
if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
continue;
}
List<String> values = new ArrayList<>();
for (int index = 2; index < totalColumns; index++) {
String dataType = columnTypeList.get(index);
String value = iterator.getString(index + 1);
if (value == null) {
values.add("null");
continue;
}
if ("TEXT".equalsIgnoreCase(dataType)
|| "STRING".equalsIgnoreCase(dataType)
|| "DATE".equalsIgnoreCase(dataType)) {
values.add(String.format("\"%s\"", value));
} else if ("BLOB".equalsIgnoreCase(dataType)) {
if (value.length() >= 2 && (value.startsWith("0x") || value.startsWith("0X"))) {
values.add(String.format("X'%s'", value.substring(2)));
} else {
values.add(String.format("X'%s'", value));
}
} else {
values.add(value);
}
}
long timestamp = iterator.getLong(1);
writer.write(String.format(sqlPrefix, deviceName, timestamp, String.join(",", values)));
currentLines += 1;

if (currentLines >= linesPerFile) {
writer.flush();
writer.close();
fileIndex += 1;
writer = null;
currentLines = 0;
}
}
} finally {
if (writer != null) {
writer.flush();
writer.close();
}
}
ioTPrinter.print("\n");
}

private static void exportToSqlFileWithoutAlign(
SessionDataSet sessionDataSet, String filePath, List<String> headers)
throws IoTDBConnectionException, StatementExecutionException, IOException {

List<String> measurementNames = new ArrayList<>();
String deviceName = null;
boolean writeNull = false;
List<String> seriesList = new ArrayList<>(headers);
if (CollectionUtils.isEmpty(headers) || headers.size() <= 1) {
writeNull = true;
return;
} else {
if (headers.contains("Device")) {
seriesList.remove("Time");
seriesList.remove("Device");
} else {
Path path = new Path(seriesList.get(1), true);
deviceName = path.getDevice();
seriesList.remove("Time");
for (int i = 0; i < seriesList.size(); i++) {
String series = seriesList.get(i);
path = new Path(series, true);
seriesList.set(i, path.getMeasurement());
List<String> localHeaders = new ArrayList<>(headers);
localHeaders.remove("Time");
if (CollectionUtils.isEmpty(localHeaders)) {
return;
}
Path path = new Path(localHeaders.get(0), true);
deviceName = path.getDevice();
for (String header : localHeaders) {
path = new Path(header, true);
String meas = path.getMeasurement();
if (path.getDevice().equals(deviceName)) {
measurementNames.add(meas);
}
}
}
boolean hasNext = true;
while (hasNext) {
int i = 0;
final String finalFilePath = filePath + "_" + fileIndex + ".sql";
try (FileWriter writer = new FileWriter(finalFilePath)) {
if (writeNull) {
break;
if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
return;
}
String sqlPrefix =
String.format(
"INSERT INTO %s(TIMESTAMP,%s) VALUES (%s);\n",
"%s", String.join(",", measurementNames), "%d,%s");

SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
List<String> columnTypeList = iterator.getColumnTypeList();
int totalColumns = measurementNames.size();
int fileIndex = 0;
int currentLines = 0;
String filePathTemplate = filePath + "_%d" + ".sql";
FileWriter writer = null;
try {
while (iterator.next()) {
if (writer == null) {
writer = new FileWriter(String.format(filePathTemplate, fileIndex));
}
while (i++ < linesPerFile) {
if (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();
List<String> headersTemp = new ArrayList<>(seriesList);
List<String> timeseries = new ArrayList<>();
if (headers.contains("Device")) {
deviceName = fields.get(0).toString();
if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
continue;
}
for (String header : headersTemp) {
timeseries.add(deviceName + "." + header);
}
} else {
if (headers.get(1).startsWith(SYSTEM_DATABASE + ".")) {
continue;
}
timeseries.addAll(headers);
timeseries.remove(0);
}
String sqlMiddle = null;
if (Boolean.TRUE.equals(aligned)) {
sqlMiddle = " ALIGNED VALUES (" + rowRecord.getTimestamp() + ",";
List<String> values = new ArrayList<>();
for (int index = 0; index < totalColumns; index++) {
String dataType = columnTypeList.get(index + 1);
String value = iterator.getString(index + 2);
if (value == null) {
values.add("null");
continue;
}
if ("TEXT".equalsIgnoreCase(dataType)
|| "STRING".equalsIgnoreCase(dataType)
|| "DATE".equalsIgnoreCase(dataType)) {
values.add(String.format("\"%s\"", value));
} else if ("BLOB".equalsIgnoreCase(dataType)) {
if (value.length() >= 2 && (value.startsWith("0x") || value.startsWith("0X"))) {
values.add(String.format("X'%s'", value.substring(2)));
} else {
sqlMiddle = " VALUES (" + rowRecord.getTimestamp() + ",";
}
List<String> values = new ArrayList<>();
if (headers.contains("Device")) {
fields.remove(0);
values.add(String.format("X'%s'", value));
}
for (int index = 0; index < fields.size(); index++) {
RowRecord next =
session
.executeQueryStatement("SHOW TIMESERIES " + timeseries.get(index), timeout)
.next();
if (ObjectUtils.isNotEmpty(next)) {
List<Field> timeseriesList = next.getFields();
String value = fields.get(index).toString();
if (value.equals("null")) {
headersTemp.remove(seriesList.get(index));
continue;
}
final String dataType = timeseriesList.get(3).getStringValue();
if (TSDataType.TEXT.name().equalsIgnoreCase(dataType)
|| TSDataType.STRING.name().equalsIgnoreCase(dataType)) {
values.add("\'" + value + "\'");
} else if (TSDataType.BLOB.name().equalsIgnoreCase(dataType)) {
final byte[] v = fields.get(index).getBinaryV().getValues();
if (v == null) {
values.add(null);
} else {
values.add(
BytesUtils.parseBlobByteArrayToString(v).replaceFirst("0x", "X'") + "'");
}
} else if (TSDataType.DATE.name().equalsIgnoreCase(dataType)) {
final LocalDate dateV = fields.get(index).getDateV();
if (dateV == null) {
values.add(null);
} else {
values.add("'" + dateV.toString() + "'");
}
} else {
values.add(value);
}
} else {
headersTemp.remove(seriesList.get(index));
continue;
}
}
if (CollectionUtils.isNotEmpty(headersTemp)) {
writer.write(
"INSERT INTO "
+ deviceName
+ "(TIMESTAMP,"
+ String.join(",", headersTemp)
+ ")"
+ sqlMiddle
+ String.join(",", values)
+ ");\n");
}

} else {
hasNext = false;
break;
values.add(value);
}
}
fileIndex++;
long timestamp = iterator.getLong(1);
writer.write(String.format(sqlPrefix, deviceName, timestamp, String.join(",", values)));
currentLines += 1;

if (currentLines >= linesPerFile) {
writer.flush();
writer.close();
fileIndex += 1;
writer = null;
currentLines = 0;
}
}
} finally {
if (writer != null) {
writer.flush();
writer.close();
}
}
ioTPrinter.print("\n");
}

public static void writeSqlFile(
SessionDataSet sessionDataSet, String filePath, List<String> headers)
throws IOException, IoTDBConnectionException, StatementExecutionException {
if (headers.contains("Device")) {
exportToSqlFileWithAlignDevice(sessionDataSet, filePath, headers);
} else {
exportToSqlFileWithoutAlign(sessionDataSet, filePath, headers);
}
}
}
Loading