Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ identityOrFunction
;

dataDesc
: ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL (COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN
: ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL (COMMA filePaths+=STRING_LITERAL)* RIGHT_PAREN
(negative=NEGATIVE)?
INTO TABLE targetTableName=identifier
(partitionSpec)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
for (Token filePath : ddc.filePaths) {
multiFilePaths.add(filePath.getText().substring(1, filePath.getText().length() - 1));
}
List<String> filePaths = ddc.filePath == null ? null : multiFilePaths;
List<String> filePaths = multiFilePaths.isEmpty() ? null : multiFilePaths;
List<Expression> colMappings;
if (ddc.columnMapping == null) {
colMappings = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,34 @@ public void testLoadCommandBitmap() {
Assertions.assertTrue(dataDescription.getColumnMappingList().get(1).child(0).getExpressionName().contains("userid_bitmap"));
}

@Test
public void testLoadCommandWithMultipleFiles() {
String loadSql = "LOAD LABEL customer_multiple_files_test( "
+ " DATA INFILE(\"s3://bucket/customer/part-1\", "
+ " \"s3://bucket/customer/part-2\", "
+ " \"s3://bucket/customer/part-3\") "
+ " INTO TABLE customer"
+ " ) "
+ " WITH S3( "
+ " \"s3.access_key\" = \"AK\", "
+ " \"s3.secret_key\" = \"SK\", "
+ " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ " \"s3.region\" = \"ap-beijing\");";

List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql);
Assertions.assertFalse(statements.isEmpty());

LoadCommand command = (LoadCommand) statements.get(0).first;
List<NereidsDataDescription> dataDescriptions = command.getDataDescriptions();
Assertions.assertFalse(dataDescriptions.isEmpty());

List<String> filePaths = dataDescriptions.get(0).getFilePaths();
Assertions.assertEquals(3, filePaths.size());
Assertions.assertEquals("s3://bucket/customer/part-1", filePaths.get(0));
Assertions.assertEquals("s3://bucket/customer/part-2", filePaths.get(1));
Assertions.assertEquals("s3://bucket/customer/part-3", filePaths.get(2));
}

@Test
public void testLoadCommand() throws Exception {
String loadSql1 = "LOAD LABEL customer_lable_for_test( "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
-- !pr22666_2 --
100490

-- !multi_infile_count --
200000
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,60 @@ suite("test_broker_load_multi_filegroup", "p0") {
order_qt_pr22666_1 """ select count(*) from ${tbl_22666} where p_brand is not null limit 10;"""
order_qt_pr22666_2 """ select count(*) from ${tbl_22666} where p_name is not null limit 10;"""

}
def tbl_multi_infile = "part_multi_infile"
sql """drop table if exists ${tbl_multi_infile} force"""
sql """
CREATE TABLE ${tbl_multi_infile} (
p_partkey int NULL,
p_name VARCHAR(55) NULL,
p_mfgr VARCHAR(25) NULL
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""

def label_multi_infile = "part_multi_infile_" + UUID.randomUUID().toString().replace("-", "0")
sql """
LOAD LABEL ${label_multi_infile} (
DATA INFILE(
"s3://${s3BucketName}/regression/load/data/part0.parquet",
"s3://${s3BucketName}/regression/load/data/part1.parquet"
)
INTO TABLE ${tbl_multi_infile}
FORMAT AS "PARQUET"
(p_partkey, p_name, p_mfgr)
)
WITH S3 (
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_ENDPOINT" = "${s3Endpoint}",
"AWS_REGION" = "${s3Region}",
"provider" = "${getS3Provider()}"
);
"""

max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
def String[][] result = sql """ show load where label="$label_multi_infile" order by createtime desc limit 1; """
logger.info("Load status: " + result[0])
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label_multi_infile)
break;
}
if (result[0][2].equals("CANCELLED")) {
assertTrue(false, "load failed: $result")
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $label_multi_infile")
}
}

order_qt_multi_infile_count """ select count(*) from ${tbl_multi_infile};"""

}
Loading