diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java index 67ee5f6d46e7e..e2a08af86d825 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java @@ -39,11 +39,9 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.utils.BytesUtils; @@ -601,7 +599,7 @@ private static void dumpResult(String sql, int index) { List names = sessionDataSet.getColumnNames(); List types = sessionDataSet.getColumnTypes(); if (EXPORT_SQL_TYPE_NAME.equalsIgnoreCase(exportType)) { - writeSqlFile(sessionDataSet, path, names, linesPerFile); + writeSqlFile(sessionDataSet, path, names); } else { if (Boolean.TRUE.equals(needDataTypePrinted)) { for (int i = 0; i < names.size(); i++) { @@ -707,128 +705,180 @@ public static void writeCsvFile( } } - public static void writeSqlFile( - SessionDataSet sessionDataSet, String filePath, List headers, int linesPerFile) + private static void exportToSqlFileWithAlignDevice( + SessionDataSet sessionDataSet, String filePath, List measurementNames) throws IOException, IoTDBConnectionException, StatementExecutionException { + + List localMeasurementNames = new ArrayList<>(measurementNames); + if (CollectionUtils.isEmpty(localMeasurementNames) || localMeasurementNames.size() <= 1) { + return; + } else { + localMeasurementNames.remove("Time"); + localMeasurementNames.remove("Device"); + } + if (CollectionUtils.isEmpty(localMeasurementNames)) { + return; + } + + String sqlPrefix = + String.format( + "INSERT INTO %s(TIMESTAMP,%s) ALIGNED VALUES (%s);\n", + "%s", String.join(",", localMeasurementNames), "%d,%s"); + + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + List columnTypeList = iterator.getColumnTypeList(); + int totalColumns = columnTypeList.size(); + String deviceName = null; int fileIndex = 0; + int currentLines = 0; + String filePathTemplate = filePath + "_%d" + ".sql"; + FileWriter writer = null; + try { + while (iterator.next()) { + if (writer == null) { + writer = new FileWriter(String.format(filePathTemplate, fileIndex)); + } + deviceName = iterator.getString(2); + if (deviceName.startsWith(SYSTEM_DATABASE + ".")) { + continue; + } + List values = new ArrayList<>(); + for (int index = 2; index < totalColumns; index++) { + String dataType = columnTypeList.get(index); + String value = iterator.getString(index + 1); + if (value == null) { + values.add("null"); + continue; + } + if ("TEXT".equalsIgnoreCase(dataType) + || "STRING".equalsIgnoreCase(dataType) + || "DATE".equalsIgnoreCase(dataType)) { + values.add(String.format("\"%s\"", value)); + } else if ("BLOB".equalsIgnoreCase(dataType)) { + if (value.length() >= 2 && (value.startsWith("0x") || value.startsWith("0X"))) { + values.add(String.format("X'%s'", value.substring(2))); + } else { + values.add(String.format("X'%s'", value)); + } + } else { + values.add(value); + } + } + long timestamp = iterator.getLong(1); + writer.write(String.format(sqlPrefix, deviceName, timestamp, String.join(",", values))); + currentLines += 1; + + if (currentLines >= linesPerFile) { + writer.flush(); + writer.close(); + fileIndex += 1; + writer = null; + currentLines = 0; + } + } + } finally { + if (writer != null) { + writer.flush(); + writer.close(); + } + } + ioTPrinter.print("\n"); + } + + private static void exportToSqlFileWithoutAlign( + SessionDataSet sessionDataSet, String filePath, List headers) + throws IoTDBConnectionException, StatementExecutionException, IOException { + + List measurementNames = new ArrayList<>(); String deviceName = null; - boolean writeNull = false; - List seriesList = new ArrayList<>(headers); if (CollectionUtils.isEmpty(headers) || headers.size() <= 1) { - writeNull = true; + return; } else { - if (headers.contains("Device")) { - seriesList.remove("Time"); - seriesList.remove("Device"); - } else { - Path path = new Path(seriesList.get(1), true); - deviceName = path.getDevice(); - seriesList.remove("Time"); - for (int i = 0; i < seriesList.size(); i++) { - String series = seriesList.get(i); - path = new Path(series, true); - seriesList.set(i, path.getMeasurement()); + List localHeaders = new ArrayList<>(headers); + localHeaders.remove("Time"); + if (CollectionUtils.isEmpty(localHeaders)) { + return; + } + Path path = new Path(localHeaders.get(0), true); + deviceName = path.getDevice(); + for (String header : localHeaders) { + path = new Path(header, true); + String meas = path.getMeasurement(); + if (path.getDevice().equals(deviceName)) { + measurementNames.add(meas); } } } - boolean hasNext = true; - while (hasNext) { - int i = 0; - final String finalFilePath = filePath + "_" + fileIndex + ".sql"; - try (FileWriter writer = new FileWriter(finalFilePath)) { - if (writeNull) { - break; + if (deviceName.startsWith(SYSTEM_DATABASE + ".")) { + return; + } + String sqlPrefix = + String.format( + "INSERT INTO %s(TIMESTAMP,%s) VALUES (%s);\n", + "%s", String.join(",", measurementNames), "%d,%s"); + + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + List columnTypeList = iterator.getColumnTypeList(); + int totalColumns = measurementNames.size(); + int fileIndex = 0; + int currentLines = 0; + String filePathTemplate = filePath + "_%d" + ".sql"; + FileWriter writer = null; + try { + while (iterator.next()) { + if (writer == null) { + writer = new FileWriter(String.format(filePathTemplate, fileIndex)); } - while (i++ < linesPerFile) { - if (sessionDataSet.hasNext()) { - RowRecord rowRecord = sessionDataSet.next(); - List fields = rowRecord.getFields(); - List headersTemp = new ArrayList<>(seriesList); - List timeseries = new ArrayList<>(); - if (headers.contains("Device")) { - deviceName = fields.get(0).toString(); - if (deviceName.startsWith(SYSTEM_DATABASE + ".")) { - continue; - } - for (String header : headersTemp) { - timeseries.add(deviceName + "." + header); - } - } else { - if (headers.get(1).startsWith(SYSTEM_DATABASE + ".")) { - continue; - } - timeseries.addAll(headers); - timeseries.remove(0); - } - String sqlMiddle = null; - if (Boolean.TRUE.equals(aligned)) { - sqlMiddle = " ALIGNED VALUES (" + rowRecord.getTimestamp() + ","; + List values = new ArrayList<>(); + for (int index = 0; index < totalColumns; index++) { + String dataType = columnTypeList.get(index + 1); + String value = iterator.getString(index + 2); + if (value == null) { + values.add("null"); + continue; + } + if ("TEXT".equalsIgnoreCase(dataType) + || "STRING".equalsIgnoreCase(dataType) + || "DATE".equalsIgnoreCase(dataType)) { + values.add(String.format("\"%s\"", value)); + } else if ("BLOB".equalsIgnoreCase(dataType)) { + if (value.length() >= 2 && (value.startsWith("0x") || value.startsWith("0X"))) { + values.add(String.format("X'%s'", value.substring(2))); } else { - sqlMiddle = " VALUES (" + rowRecord.getTimestamp() + ","; - } - List values = new ArrayList<>(); - if (headers.contains("Device")) { - fields.remove(0); + values.add(String.format("X'%s'", value)); } - for (int index = 0; index < fields.size(); index++) { - RowRecord next = - session - .executeQueryStatement("SHOW TIMESERIES " + timeseries.get(index), timeout) - .next(); - if (ObjectUtils.isNotEmpty(next)) { - List timeseriesList = next.getFields(); - String value = fields.get(index).toString(); - if (value.equals("null")) { - headersTemp.remove(seriesList.get(index)); - continue; - } - final String dataType = timeseriesList.get(3).getStringValue(); - if (TSDataType.TEXT.name().equalsIgnoreCase(dataType) - || TSDataType.STRING.name().equalsIgnoreCase(dataType)) { - values.add("\'" + value + "\'"); - } else if (TSDataType.BLOB.name().equalsIgnoreCase(dataType)) { - final byte[] v = fields.get(index).getBinaryV().getValues(); - if (v == null) { - values.add(null); - } else { - values.add( - BytesUtils.parseBlobByteArrayToString(v).replaceFirst("0x", "X'") + "'"); - } - } else if (TSDataType.DATE.name().equalsIgnoreCase(dataType)) { - final LocalDate dateV = fields.get(index).getDateV(); - if (dateV == null) { - values.add(null); - } else { - values.add("'" + dateV.toString() + "'"); - } - } else { - values.add(value); - } - } else { - headersTemp.remove(seriesList.get(index)); - continue; - } - } - if (CollectionUtils.isNotEmpty(headersTemp)) { - writer.write( - "INSERT INTO " - + deviceName - + "(TIMESTAMP," - + String.join(",", headersTemp) - + ")" - + sqlMiddle - + String.join(",", values) - + ");\n"); - } - } else { - hasNext = false; - break; + values.add(value); } } - fileIndex++; + long timestamp = iterator.getLong(1); + writer.write(String.format(sqlPrefix, deviceName, timestamp, String.join(",", values))); + currentLines += 1; + + if (currentLines >= linesPerFile) { + writer.flush(); + writer.close(); + fileIndex += 1; + writer = null; + currentLines = 0; + } + } + } finally { + if (writer != null) { writer.flush(); + writer.close(); } } + ioTPrinter.print("\n"); + } + + public static void writeSqlFile( + SessionDataSet sessionDataSet, String filePath, List headers) + throws IOException, IoTDBConnectionException, StatementExecutionException { + if (headers.contains("Device")) { + exportToSqlFileWithAlignDevice(sessionDataSet, filePath, headers); + } else { + exportToSqlFileWithoutAlign(sessionDataSet, filePath, headers); + } } }