diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java index b61a13693dc1..9079daeab71d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java @@ -31,7 +31,6 @@ import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -271,68 +270,9 @@ private void testPipeWithMultiplePatterns( } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testMultiplePrefixPatternHistoricalData() throws Exception { - // Define source attributes - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.pattern", "root.db.d1.s, root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - // Define data to be inserted - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db.d2(time, s) values (2, 2)", - "insert into root.db2.d1(time, s) values (3, 3)"); - - // Define expected results on receiver - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,null,1.0,1.0,"); - expectedResSet.add("3,3.0,null,null,"); - - // Execute the common test logic - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - true, // isHistorical = true - "select * from root.db2.**,root.db.**", - "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testMultiplePrefixPatternRealtimeData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.pattern", "root.db.d1.s, root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db.d2(time, s) values (2, 2)", - "insert into root.db2.d1(time, s) values (3, 3)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,null,1.0,1.0,"); - expectedResSet.add("3,3.0,null,null,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - false, // isHistorical = false - "select * from root.db2.**,root.db.**", - "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testMultipleIoTDBPatternHistoricalData() throws Exception { final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.**, root.db2.d1.*"); + sourceAttributes.put("source.pattern.inclusion", "root.db.**, root.db2.d1.*"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); @@ -358,10 +298,9 @@ public void testMultipleIoTDBPatternHistoricalData() throws Exception { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testMultipleIoTDBPatternRealtimeData() throws Exception { final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.**, root.db2.d1.*"); + sourceAttributes.put("source.pattern.inclusion", "root.db.**, root.db2.d1.*"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); @@ -387,129 +326,12 @@ public void testMultipleIoTDBPatternRealtimeData() throws Exception { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testMultipleHybridPatternHistoricalData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.d1.*"); - sourceAttributes.put("source.pattern", "root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db2.d1(time, s) values (2, 2)", - "insert into root.db3.d1(time, s) values (3, 3)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,1.0,1.0,null,"); - expectedResSet.add("2,null,null,2.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - true, // isHistorical = true - "select * from root.db.**,root.db2.**", - "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testMultipleHybridPatternRealtimeData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.d1.*"); - sourceAttributes.put("source.pattern", "root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db2.d1(time, s) values (2, 2)", - "insert into root.db3.d1(time, s) values (3, 3)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,1.0,1.0,null,"); - expectedResSet.add("2,null,null,2.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - false, // isHistorical = false - "select * from root.db.**,root.db2.**", - "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testPrefixPatternWithExclusionHistoricalData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - // Inclusion: Match everything under root.db.d1 and root.db.d2 - sourceAttributes.put("source.pattern", "root.db.d1, root.db.d2"); - // Exclusion: Exclude anything with the prefix root.db.d1.s1 - sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s1"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - // s matches, s1 is excluded - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - // s matches - "insert into root.db.d2(time, s) values (2, 2)", - "insert into root.db1.d1(time, s) values (3, 3)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,1.0,null,"); - expectedResSet.add("2,null,2.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - true, // isHistorical = true - "select * from root.db.**", - "Time,root.db.d1.s,root.db.d2.s,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testPrefixPatternWithExclusionRealtimeData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.pattern", "root.db.d1, root.db.d2"); - sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s1"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db.d2(time, s) values (2, 2)", - "insert into root.db1.d1(time, s) values (3, 3)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1,1.0,null,"); - expectedResSet.add("2,null,2.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - false, // isHistorical = false - "select * from root.db.**", - "Time,root.db.d1.s,root.db.d2.s,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testIoTDBPatternWithExclusionHistoricalData() throws Exception { final Map sourceAttributes = new HashMap<>(); // Inclusion: Match everything under root.db - sourceAttributes.put("source.path", "root.db.**"); + sourceAttributes.put("source.pattern.inclusion", "root.db.**"); // Exclusion: Exclude root.db.d1.s* and root.db.d3.* - sourceAttributes.put("source.path.exclusion", "root.db.d1.s*, root.db.d3.*"); + sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s*, root.db.d3.*"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); @@ -537,11 +359,10 @@ public void testIoTDBPatternWithExclusionHistoricalData() throws Exception { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testIoTDBPatternWithExclusionRealtimeData() throws Exception { final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.**"); - sourceAttributes.put("source.path.exclusion", "root.db.d1.s*, root.db.d3.*"); + sourceAttributes.put("source.pattern.inclusion", "root.db.**"); + sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s*, root.db.d3.*"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); @@ -564,71 +385,4 @@ public void testIoTDBPatternWithExclusionRealtimeData() throws Exception { "Time,root.db.d1.t,root.db.d2.s,", expectedResSet); } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testHybridPatternWithHybridExclusionHistoricalData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - // Inclusion: Match root.db.** (IoTDB) AND root.db2.d1 (Prefix) - sourceAttributes.put("source.path", "root.db.**"); - sourceAttributes.put("source.pattern", "root.db2.d1"); - // Exclusion: Exclude root.db.d1.* (IoTDB) AND root.db2.d1.s (Prefix) - sourceAttributes.put("source.path.exclusion", "root.db.d1.*"); - sourceAttributes.put("source.pattern.exclusion", "root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - // s, s1 excluded by path.exclusion - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - // s matches - "insert into root.db.d2(time, s) values (2, 2)", - // s excluded by pattern.exclusion, t matches - "insert into root.db2.d1(time, s, t) values (3, 3, 3)", - "insert into root.db3.d1(time, s) values (4, 4)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("2,2.0,null,"); - expectedResSet.add("3,null,3.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - true, // isHistorical = true - "select * from root.db.**,root.db2.**", - "Time,root.db.d2.s,root.db2.d1.t,", - expectedResSet); - } - - @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") - public void testHybridPatternWithHybridExclusionRealtimeData() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.path", "root.db.**"); - sourceAttributes.put("source.pattern", "root.db2.d1"); - sourceAttributes.put("source.path.exclusion", "root.db.d1.*"); - sourceAttributes.put("source.pattern.exclusion", "root.db2.d1.s"); - sourceAttributes.put("source.inclusion", "data.insert"); - sourceAttributes.put("user", "root"); - - final List insertQueries = - Arrays.asList( - "insert into root.db.d1(time, s, s1) values (1, 1, 1)", - "insert into root.db.d2(time, s) values (2, 2)", - "insert into root.db2.d1(time, s, t) values (3, 3, 3)", - "insert into root.db3.d1(time, s) values (4, 4)"); - - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("2,2.0,null,"); - expectedResSet.add("3,null,3.0,"); - - testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - false, // isHistorical = false - "select * from root.db.**,root.db2.**", - "Time,root.db.d2.s,root.db2.d1.t,", - expectedResSet); - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java index 5bc4f4680621..a95e8535d422 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java @@ -29,7 +29,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -114,7 +113,6 @@ public void testPureSchemaInclusion() throws Exception { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testPureSchemaInclusionWithMultiplePattern() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -127,7 +125,7 @@ public void testPureSchemaInclusionWithMultiplePattern() throws Exception { final Map processorAttributes = new HashMap<>(); final Map sinkAttributes = new HashMap<>(); - sourceAttributes.put("path", "root.ln.wf01.wt01.status,root.ln.wf02.**"); + sourceAttributes.put("source.pattern.inclusion", "root.ln.wf01.wt01.status,root.ln.wf02.**"); sourceAttributes.put("source.inclusion", "schema"); sourceAttributes.put("user", "root"); @@ -187,7 +185,6 @@ public void testPureSchemaInclusionWithMultiplePattern() throws Exception { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testPureSchemaInclusionWithExclusionPattern() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -200,13 +197,13 @@ public void testPureSchemaInclusionWithExclusionPattern() throws Exception { final Map processorAttributes = new HashMap<>(); final Map sinkAttributes = new HashMap<>(); - sourceAttributes.put("extractor.inclusion", "schema"); + sourceAttributes.put("source.inclusion", "schema"); sourceAttributes.put("user", "root"); // Include root.ln.** - sourceAttributes.put("path", "root.ln.**"); + sourceAttributes.put("source.pattern.inclusion", "root.ln.**"); // Exclude root.ln.wf02.* and root.ln.wf03.wt01.status - sourceAttributes.put("path.exclusion", "root.ln.wf02.**, root.ln.wf03.wt01.status"); + sourceAttributes.put("source.pattern.exclusion", "root.ln.wf02.**, root.ln.wf03.wt01.status"); sinkAttributes.put("connector", "iotdb-thrift-connector"); sinkAttributes.put("connector.ip", receiverIp); @@ -314,7 +311,7 @@ public void testAuthInclusionWithPattern() throws Exception { final Map sinkAttributes = new HashMap<>(); sourceAttributes.put("source.inclusion", "auth"); - sourceAttributes.put("path", "root.ln.**"); + sourceAttributes.put("source.pattern.inclusion", "root.ln.**"); sourceAttributes.put("user", "root"); sinkAttributes.put("sink", "iotdb-thrift-sink"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 44f02bbd4bba..2f1be77fe3ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -109,7 +109,10 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY; @@ -118,7 +121,10 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY; @@ -808,10 +814,23 @@ private long calculateTsFileParserMemory( // If the source has pattern or path, we need to allocate memory isTSFileParser = isTSFileParser - || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); + || sourceParameters.hasAnyAttributes( + EXTRACTOR_PATTERN_KEY, + SOURCE_PATTERN_KEY, + EXTRACTOR_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY, + EXTRACTOR_PATTERN_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY); isTSFileParser = - isTSFileParser || sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); + isTSFileParser + || sourceParameters.hasAnyAttributes( + EXTRACTOR_PATH_KEY, + SOURCE_PATH_KEY, + EXTRACTOR_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY, + EXTRACTOR_PATH_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY); // If the source is not hybrid, we do need to allocate memory isTSFileParser = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 4e69b47cc8df..d1ceb48e8aad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -78,6 +78,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY; @@ -109,6 +110,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; @@ -180,7 +182,12 @@ public void validate(final PipeParameterValidator validator) throws Exception { && validator .getParameters() .hasAnyAttributes( - EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY)) { + EXTRACTOR_PATH_KEY, + SOURCE_PATH_KEY, + EXTRACTOR_PATTERN_KEY, + SOURCE_PATTERN_KEY, + EXTRACTOR_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY)) { throw new PipeException( "The pipe cannot extract tree model data when sql dialect is set to table."); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java index 62868f0492e0..d5a209e5d87b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java @@ -21,19 +21,16 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; -import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBTreePattern; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.util.HashMap; -@Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public class TreePatternPruningTest { @Test @@ -42,7 +39,7 @@ public void testUnionInternalPruning_Cover() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.db.d1.*,root.db.d1.s1"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.db.d1.*,root.db.d1.s1"); } }); @@ -58,7 +55,7 @@ public void testUnionInternalPruning_Duplicates() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.db.d1,root.db.d1"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.db.d1,root.db.d1"); } }); @@ -74,8 +71,8 @@ public void testInclusionPrunedByExclusion_Partial() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1,root.sg.d2"); - put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.d1"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.sg.d1,root.sg.d2"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.sg.d1"); } }); @@ -91,8 +88,8 @@ public void testInclusionPrunedByExclusion_FullCoverage_Exception() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1"); - put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.**"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.sg.d1"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.sg.**"); } }); @@ -110,8 +107,10 @@ public void testComplexPruning() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.A,root.sg.B,root.sg.A.sub"); - put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.A,root.sg.A.**"); + put( + PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, + "root.sg.A,root.sg.B,root.sg.A.sub"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.sg.A,root.sg.A.**"); } }); @@ -122,21 +121,21 @@ public void testComplexPruning() { } @Test - public void testComplexPruning_Prefix() { + public void testLegacyPatternMultipleRulesRejected() { final PipeParameters params = new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A,root.sg.B,root.sg.A.sub"); - put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.sg.A"); - put(PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY, "prefix"); + put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A,root.sg.B"); } }); - final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); - - Assert.assertTrue(result instanceof PrefixTreePattern); - Assert.assertEquals("root.sg.B", result.getPattern()); + try { + TreePattern.parsePipePatternFromSourceParameters(params); + Assert.fail("Should throw PipeException for legacy multi-pattern parameters"); + } catch (final PipeException ignored) { + // Expected exception + } } @Test @@ -145,8 +144,8 @@ public void testUnionPreservedWhenNotCovered() { new PipeParameters( new HashMap() { { - put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1,root.sg.d2"); - put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.other"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.sg.d1,root.sg.d2"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.other"); } }); @@ -155,4 +154,22 @@ public void testUnionPreservedWhenNotCovered() { Assert.assertTrue(result instanceof UnionIoTDBTreePattern); Assert.assertEquals("root.sg.d1,root.sg.d2", result.getPattern()); } + + @Test + public void testLegacyPathMultipleRulesRejected() { + final PipeParameters params = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1,root.sg.d2"); + } + }); + + try { + TreePattern.parsePipePatternFromSourceParameters(params); + Assert.fail("Should throw PipeException for legacy multi-path parameters"); + } catch (final PipeException ignored) { + // Expected exception + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java index feca8d54fed1..dd0558bc2380 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java @@ -25,7 +25,6 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.util.HashMap; @@ -54,7 +53,6 @@ public void testIoTDBDataRegionExtractor() { } @Test - @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch") public void testIoTDBDataRegionExtractorWithPattern() { Assert.assertEquals( IllegalArgumentException.class, @@ -84,8 +82,6 @@ public void testIoTDBDataRegionExtractorWithPattern() { Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root")); Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`")); Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1")); - Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b")); - Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b,root.db1.`a,b`.**")); } public Exception testIoTDBDataRegionExtractorWithPattern(final String pattern) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java index d13bdf7d046c..7107614ebd57 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java @@ -52,6 +52,8 @@ public class PipeSourceConstant { public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern"; public static final String SOURCE_PATTERN_KEY = "source.pattern"; + public static final String EXTRACTOR_PATTERN_INCLUSION_KEY = "extractor.pattern.inclusion"; + public static final String SOURCE_PATTERN_INCLUSION_KEY = "source.pattern.inclusion"; public static final String EXTRACTOR_PATH_KEY = "extractor.path"; public static final String SOURCE_PATH_KEY = "source.path"; public static final String EXTRACTOR_PATTERN_FORMAT_KEY = "extractor.pattern.format"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 9a9360a0abf1..0cea3b34e489 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -47,11 +47,13 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY; public abstract class TreePattern { @@ -143,18 +145,7 @@ public static List applyIndexesOnList( */ public static TreePattern parsePipePatternFromSourceParameters( final PipeParameters sourceParameters) { - final TreePattern treePattern = parsePipePatternFromSourceParametersInternal(sourceParameters); - if (!treePattern.isSingle()) { - final String msg = - String.format( - "Pipe: The provided pattern should be single now. " + "Inclusion: %s, Exclusion: %s", - sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), - sourceParameters.getStringByKeys( - EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY)); - LOGGER.warn(msg); - throw new PipeException(msg); - } - return treePattern; + return parsePipePatternFromSourceParametersInternal(sourceParameters); } public static TreePattern parsePipePatternFromSourceParametersInternal( @@ -162,15 +153,41 @@ public static TreePattern parsePipePatternFromSourceParametersInternal( final boolean isTreeModelDataAllowedToBeCaptured = isTreeModelDataAllowToBeCaptured(sourceParameters); + final boolean hasPatternInclusionKey = + sourceParameters.hasAnyAttributes( + EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY); + final boolean hasLegacyPathKey = + sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); + final boolean hasLegacyPatternKey = + sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); + + if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) { + final String msg = + String.format( + "Pipe: %s cannot be used together with %s or %s.", + SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_KEY, SOURCE_PATH_KEY); + LOGGER.warn(msg); + throw new PipeException(msg); + } + // 1. Parse INCLUSION patterns into a list List inclusionPatterns = - parsePatternList( - sourceParameters, - isTreeModelDataAllowedToBeCaptured, - EXTRACTOR_PATH_KEY, - SOURCE_PATH_KEY, - EXTRACTOR_PATTERN_KEY, - SOURCE_PATTERN_KEY); + hasPatternInclusionKey + ? parseIoTDBPatternList( + sourceParameters.getStringByKeys( + EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY), + isTreeModelDataAllowedToBeCaptured, + true, + SOURCE_PATTERN_INCLUSION_KEY) + : parseLegacyPatternList( + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATH_KEY, + SOURCE_PATH_KEY, + EXTRACTOR_PATTERN_KEY, + SOURCE_PATTERN_KEY, + SOURCE_PATH_KEY, + SOURCE_PATTERN_KEY); // If no inclusion patterns are specified, use default "root.**" if (inclusionPatterns.isEmpty()) { @@ -181,14 +198,34 @@ public static TreePattern parsePipePatternFromSourceParametersInternal( } // 2. Parse EXCLUSION patterns into a list + if (hasPatternInclusionKey + && sourceParameters.hasAnyAttributes( + EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) { + final String msg = + String.format( + "Pipe: %s cannot be used together with %s.", + SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY); + LOGGER.warn(msg); + throw new PipeException(msg); + } + List exclusionPatterns = - parsePatternList( - sourceParameters, - isTreeModelDataAllowedToBeCaptured, - EXTRACTOR_PATH_EXCLUSION_KEY, - SOURCE_PATH_EXCLUSION_KEY, - EXTRACTOR_PATTERN_EXCLUSION_KEY, - SOURCE_PATTERN_EXCLUSION_KEY); + hasPatternInclusionKey + ? parseIoTDBPatternList( + sourceParameters.getStringByKeys( + EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY), + isTreeModelDataAllowedToBeCaptured, + true, + SOURCE_PATTERN_EXCLUSION_KEY) + : parseLegacyPatternList( + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATH_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY, + EXTRACTOR_PATTERN_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY); // 3. Optimize the lists: remove redundant patterns (e.g., if "root.**" exists, "root.db" is // redundant) @@ -206,9 +243,18 @@ public static TreePattern parsePipePatternFromSourceParametersInternal( "Pipe: The provided exclusion pattern fully covers the inclusion pattern. " + "This pipe pattern will match nothing. " + "Inclusion: %s, Exclusion: %s", - sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), sourceParameters.getStringByKeys( - EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY)); + EXTRACTOR_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY, + EXTRACTOR_PATH_KEY, + SOURCE_PATH_KEY, + EXTRACTOR_PATTERN_KEY, + SOURCE_PATTERN_KEY), + sourceParameters.getStringByKeys( + EXTRACTOR_PATTERN_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY, + EXTRACTOR_PATH_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY)); LOGGER.warn(msg); throw new PipeException(msg); } @@ -319,37 +365,71 @@ public static TreePattern parsePatternFromString( } /** - * Helper method to parse pattern parameters into a list of patterns without creating the Union - * object immediately. + * Helper method to parse legacy pattern parameters into a list of patterns without creating the + * Union object immediately. */ - private static List parsePatternList( + private static List parseLegacyPatternList( final PipeParameters sourceParameters, final boolean isTreeModelDataAllowedToBeCaptured, final String extractorPathKey, final String sourcePathKey, final String extractorPatternKey, - final String sourcePatternKey) { + final String sourcePatternKey, + final String pathKeyName, + final String patternKeyName) { final String path = sourceParameters.getStringByKeys(extractorPathKey, sourcePathKey); final String pattern = sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey); + if (path != null && pattern != null) { + final String msg = + String.format("Pipe: %s and %s cannot be used together.", pathKeyName, patternKeyName); + LOGGER.warn(msg); + throw new PipeException(msg); + } + final List result = new ArrayList<>(); if (path != null) { result.addAll( - parseMultiplePatterns( - path, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p))); + parseIoTDBPatternList(path, isTreeModelDataAllowedToBeCaptured, false, pathKeyName)); } if (pattern != null) { result.addAll( parsePatternsFromPatternParameter( - pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured)); + pattern, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + false, + patternKeyName)); } return result; } + private static List parseIoTDBPatternList( + final String pattern, + final boolean isTreeModelDataAllowedToBeCaptured, + final boolean allowMultiple, + final String parameterKey) { + if (pattern == null) { + return new ArrayList<>(); + } + + final List patterns = + parseMultiplePatterns( + pattern, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + + if (!allowMultiple && patterns.size() > 1) { + final String msg = + String.format("Pipe: The parameter %s only supports a single pattern now.", parameterKey); + LOGGER.warn(msg); + throw new PipeException(msg); + } + return patterns; + } + /** * Removes patterns from the list that are covered by other patterns in the same list. For * example, if "root.**" and "root.db.**" are present, "root.db.**" is removed. @@ -554,29 +634,47 @@ private static List pruneIrrelevantExclusions( private static List parsePatternsFromPatternParameter( final String pattern, final PipeParameters sourceParameters, - final boolean isTreeModelDataAllowedToBeCaptured) { + final boolean isTreeModelDataAllowedToBeCaptured, + final boolean allowMultiple, + final String parameterKey) { final String patternFormat = sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, SOURCE_PATTERN_FORMAT_KEY); + final List patterns; // If "source.pattern.format" is not specified, use prefix format by default. if (patternFormat == null) { - return parseMultiplePatterns( - pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + patterns = + parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + } else { + switch (patternFormat.toLowerCase()) { + case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE: + patterns = + parseMultiplePatterns( + pattern, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + break; + case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE: + patterns = + parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + break; + default: + LOGGER.info( + "Unknown pattern format: {}, use prefix matching format by default.", patternFormat); + patterns = + parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + } } - switch (patternFormat.toLowerCase()) { - case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE: - return parseMultiplePatterns( - pattern, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); - case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE: - return parseMultiplePatterns( - pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); - default: - LOGGER.info( - "Unknown pattern format: {}, use prefix matching format by default.", patternFormat); - return parseMultiplePatterns( - pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + if (!allowMultiple && patterns.size() > 1) { + final String msg = + String.format("Pipe: The parameter %s only supports a single pattern now.", parameterKey); + LOGGER.warn(msg); + throw new PipeException(msg); } + + return patterns; } private static List parseMultiplePatterns(