Skip to content

Commit

Permalink
Pipe: Implemented runtime permission check (#14885)
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Mar 7, 2025
1 parent ea43227 commit 5bb6803
Show file tree
Hide file tree
Showing 85 changed files with 2,297 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,12 @@ public static boolean hasDataBase(final String database, final BaseEnv baseEnv)
return true;
}

public static void assertCountDataAlwaysOnEnv(
final String database, final String table, final int count, final BaseEnv baseEnv) {
TestUtils.assertDataAlwaysOnEnv(
baseEnv, getQueryCountSql(table), "_col0,", Collections.singleton(count + ","), database);
}

public static void assertCountData(
final String database, final String table, final int count, final BaseEnv baseEnv) {
TestUtils.assertDataEventuallyOnEnv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
extractorAttributes.put("extractor.realtime.mode", "log");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "true");
Expand Down Expand Up @@ -171,6 +172,7 @@ private void testSinkFormat(final String format) throws Exception {

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "true");
Expand Down Expand Up @@ -261,11 +263,13 @@ public void testWriteBackSink() throws Exception {
extractorAttributes.put("forwarding-pipe-requests", "false");
extractorAttributes.put("extractor.database-name", "test.*");
extractorAttributes.put("extractor.table-name", "test.*");
extractorAttributes.put("user", "root");

processorAttributes.put("processor", "rename-database-processor");
processorAttributes.put("processor.new-db-name", "Test1");

connectorAttributes.put("connector", "write-back-sink");
connectorAttributes.put("user", "root");

final TSStatus status =
client.createPipe(
Expand Down Expand Up @@ -377,6 +381,7 @@ private void doTest(BiConsumer<Map<String, List<Tablet>>, Map<String, List<Table
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("extractor.database-name", "test.*");
extractorAttributes.put("extractor.table-name", "test.*");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
Expand Down Expand Up @@ -734,6 +739,7 @@ public void testLoadTsFileWithoutVerify() throws Exception {
extractorAttributes.put("extractor.realtime.mode", "batch");
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("sink", "iotdb-thrift-sink");
connectorAttributes.put("sink.batch.enable", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void testMatchingMultipleDatabases() throws Exception {
extractorAttributes.put("extractor.table-name", "test");
extractorAttributes.put("extractor.pattern", "root.db1");
extractorAttributes.put("extractor.inclusion", "data.insert");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -264,6 +265,7 @@ public void testHistoryAndRealtime() throws Exception {
extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.history.enable", "false");
extractorAttributes.put("extractor.realtime.enable", "true");
extractorAttributes.put("user", "root");
TSStatus status =
client.createPipe(
new TCreatePipeReq("p2", connectorAttributes)
Expand Down Expand Up @@ -401,6 +403,7 @@ public void testHistoryStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws E
// 1970-01-01T08:00:02+08:00
extractorAttributes.put("extractor.history.start-time", "2");
extractorAttributes.put("extractor.history.end-time", "3");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -498,6 +501,7 @@ public void testExtractorTimeRangeMatch() throws Exception {
extractorAttributes.put("source.inclusion", "data.insert");
extractorAttributes.put("source.start-time", "2");
extractorAttributes.put("source.end-time", "4");
extractorAttributes.put("user", "root");

final TSStatus status =
client.createPipe(
Expand Down Expand Up @@ -616,6 +620,7 @@ public void testSourceStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Ex
extractorAttributes.put("source.start-time", "2");
// 1970-01-01T08:00:04+08:00
extractorAttributes.put("source.end-time", "4");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -733,6 +738,7 @@ public void testHistoryLooseRange() throws Exception {
extractorAttributes.put("source.history.start-time", "1");
extractorAttributes.put("source.history.end-time", "2");
extractorAttributes.put("source.history.loose-range", "time, path");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -789,6 +795,7 @@ public void testRealtimeLooseRange() throws Exception {
extractorAttributes.put("source.start-time", "2");
extractorAttributes.put("source.end-time", "10");
extractorAttributes.put("source.realtime.mode", "batch");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testLifeCycleWithHistoryEnabled() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -161,6 +162,7 @@ public void testLifeCycleWithHistoryDisabled() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("user", "root");
extractorAttributes.put("extractor.inclusion", "data.insert");
extractorAttributes.put("extractor.inclusion.exclusion", "");

Expand Down Expand Up @@ -224,6 +226,7 @@ public void testLifeCycleLogMode() throws Exception {

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("extractor.mode", "forced-log");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -286,6 +289,7 @@ public void testLifeCycleFileMode() throws Exception {

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("mode.streaming", "false");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -347,6 +351,7 @@ public void testLifeCycleHybridMode() throws Exception {

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("extractor.mode", "hybrid");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -404,6 +409,7 @@ public void testLifeCycleWithClusterRestart() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -474,6 +480,7 @@ public void testReceiverRestartWhenTransferring() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -557,6 +564,7 @@ public void testReceiverAlreadyHaveTimeSeries() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -625,6 +633,7 @@ public void testDoubleLiving() throws Exception {
// Add this property to avoid to make self cycle.
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("forwarding-pipe-requests", "false");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -665,6 +674,7 @@ public void testDoubleLiving() throws Exception {
extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("forwarding-pipe-requests", "false");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down
Loading

0 comments on commit 5bb6803

Please sign in to comment.