Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ public IcebergTableProcedure getTableProcedure(String procedureName) {
@Override
public Set<TableOperation> getSupportedOperations() {
return Sets.newHashSet(TableOperation.READ, TableOperation.INSERT, TableOperation.DROP, TableOperation.CREATE,
TableOperation.ALTER);
TableOperation.ALTER, TableOperation.DELETE);
}

public void setIcebergMetricsReporter(IcebergMetricsReporter reporter) {
Expand Down
158 changes: 158 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/IcebergDeleteSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.planner;

import com.starrocks.catalog.IcebergTable;;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergUtil;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.qe.SessionVariable;
import com.starrocks.thrift.TCompressionType;
import com.starrocks.thrift.TDataSink;
import com.starrocks.thrift.TDataSinkType;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TIcebergTableSink;
import com.starrocks.type.IntegerType;
import com.starrocks.type.VarcharType;
import org.apache.iceberg.Table;

import static com.starrocks.sql.ast.OutFileClause.PARQUET_COMPRESSION_TYPE_MAP;

/**
* IcebergDeleteSink is used to support delete operations to Iceberg tables.
* It supports write position delete files
* <p>
* Required columns:
* - _file (STRING): Path of the data file
* - _pos (BIGINT): Row position within the file
*/
public class IcebergDeleteSink extends DataSink {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you could make DeleteSink as a separate PR and then follow it with a delete plan PR.

protected final TupleDescriptor desc;
private IcebergTable icebergTable;
private final long targetTableId;
private final String tableLocation;
private final String dataLocation;
private final String compressionType;
private final long targetMaxFileSize;
private final String tableIdentifier;
private CloudConfiguration cloudConfiguration;

/**
* Constructor for IcebergDeleteSink
*
* @param icebergTable The target Iceberg table
* @param desc Tuple descriptor containing operation columns
* @param sessionVariable Session variables for configuration
*/
public IcebergDeleteSink(IcebergTable icebergTable, TupleDescriptor desc,
SessionVariable sessionVariable) {
this.icebergTable = icebergTable;
Table nativeTable = icebergTable.getNativeTable();
this.desc = desc;
this.tableLocation = nativeTable.location();
this.dataLocation = IcebergUtil.tableDataLocation(nativeTable);
this.targetTableId = icebergTable.getId();
this.tableIdentifier = icebergTable.getUUID();
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
sessionVariable.getConnectorSinkTargetMaxFileSize() : 1024L * 1024 * 1024;
}

public void init() {
String catalogName = icebergTable.getCatalogName();
this.cloudConfiguration = IcebergUtil.getVendedCloudConfiguration(catalogName, icebergTable);
// Validate tuple descriptor contains required columns
validateDeleteTuple(desc);
}

/**
* Validate that the tuple descriptor contains required columns
*
* @param desc The tuple descriptor to validate
*/
private void validateDeleteTuple(TupleDescriptor desc) {
boolean hasFilePathColumn = false;
boolean hasPosColumn = false;

for (SlotDescriptor slot : desc.getSlots()) {
if (slot.getColumn() != null) {
String colName = slot.getColumn().getName();
if (IcebergTable.FILE_PATH.equals(colName)) {
hasFilePathColumn = true;
if (!slot.getType().equals(VarcharType.VARCHAR)) {
throw new StarRocksConnectorException("_file column must be type of VARCHAR");
}
} else if (IcebergTable.ROW_POSITION.equals(colName)) {
hasPosColumn = true;
if (!slot.getType().equals(IntegerType.BIGINT)) {
throw new StarRocksConnectorException("_pos column must be type of BIGINT");
}
}
}
}

if (!hasFilePathColumn || !hasPosColumn) {
throw new StarRocksConnectorException("IcebergDeleteSink requires _file and _pos columns in tuple descriptor");
}
}

@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix).append("ICEBERG DELETE SINK");
strBuilder.append("\n");
strBuilder.append(prefix).append(" TABLE: ").append(tableIdentifier).append("\n");
strBuilder.append(prefix).append(" LOCATION: ").append(tableLocation).append("\n");
strBuilder.append(prefix).append(" TUPLE ID: ").append(desc.getId()).append("\n");
return strBuilder.toString();
}

@Override
protected TDataSink toThrift() {
TDataSink tDataSink = new TDataSink(TDataSinkType.ICEBERG_DELETE_SINK);
TIcebergTableSink tIcebergTableSink = new TIcebergTableSink();
tIcebergTableSink.setTarget_table_id(targetTableId);
tIcebergTableSink.setTuple_id(desc.getId().asInt());
tIcebergTableSink.setLocation(tableLocation);
// For delete sink, we set both data and delete locations
tIcebergTableSink.setData_location(dataLocation);
tIcebergTableSink.setFile_format("parquet"); // Delete files are always parquet
tIcebergTableSink.setIs_static_partition_sink(false);
TCompressionType compression = PARQUET_COMPRESSION_TYPE_MAP.get(compressionType);
tIcebergTableSink.setCompression_type(compression);
tIcebergTableSink.setTarget_max_file_size(targetMaxFileSize);
com.starrocks.thrift.TCloudConfiguration tCloudConfiguration = new com.starrocks.thrift.TCloudConfiguration();
cloudConfiguration.toThrift(tCloudConfiguration);
tIcebergTableSink.setCloud_configuration(tCloudConfiguration);

tDataSink.setIceberg_table_sink(tIcebergTableSink);
return tDataSink;
}

@Override
public PlanNodeId getExchNodeId() {
return null;
}

@Override
public DataPartition getOutputPartition() {
return null;
}

@Override
public boolean canUseRuntimeAdaptiveDop() {
return true;
}
}
102 changes: 99 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package com.starrocks.sql;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.TableName;
Expand All @@ -25,6 +27,7 @@
import com.starrocks.load.Load;
import com.starrocks.planner.DataSink;
import com.starrocks.planner.DescriptorTable;
import com.starrocks.planner.IcebergDeleteSink;
import com.starrocks.planner.OlapTableSink;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.SlotDescriptor;
Expand All @@ -34,12 +37,17 @@
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.sql.ast.QueryRelation;
import com.starrocks.sql.ast.expression.Expr;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.Optimizer;
import com.starrocks.sql.optimizer.OptimizerFactory;
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
import com.starrocks.sql.optimizer.base.ColumnRefSet;
import com.starrocks.sql.optimizer.base.DistributionProperty;
import com.starrocks.sql.optimizer.base.DistributionSpec;
import com.starrocks.sql.optimizer.base.HashDistributionDesc;
import com.starrocks.sql.optimizer.base.PhysicalPropertySet;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.transformer.LogicalPlan;
import com.starrocks.sql.optimizer.transformer.RelationTransformer;
import com.starrocks.sql.plan.ExecPlan;
Expand Down Expand Up @@ -70,7 +78,16 @@ private ExecPlan planDelete(DeleteStmt deleteStatement, ConnectContext session)
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
LogicalPlan logicalPlan = new RelationTransformer(columnRefFactory, session).transform(query);

PhysicalPropertySet requiredProperty = new PhysicalPropertySet();
// Determine physical properties based on table type
PhysicalPropertySet requiredProperty;
if (table instanceof IcebergTable) {
// For Iceberg, create shuffled property based on partitioning
List<ColumnRefOperator> outputColumns = logicalPlan.getOutputColumn();
requiredProperty = createShuffleProperty((IcebergTable) table, outputColumns);
} else {
// For other tables, use default empty property
requiredProperty = new PhysicalPropertySet();
}

// Optimize logical plan, create physical plan, setup sink and configure pipeline
return createDeletePlan(
Expand Down Expand Up @@ -117,8 +134,14 @@ private ExecPlan createDeletePlan(
logicalPlan.getOutputColumn(), columnRefFactory,
colNames, TResultSinkType.MYSQL_PROTOCAL, false);

// Setup OLAP table sink for delete operations
setupOlapTableSink(execPlan, deleteStatement, session);
// Create sink based on table type
if (table instanceof IcebergTable) {
setupIcebergDeleteSink(execPlan, colNames, (IcebergTable) table, session);
} else if (table instanceof OlapTable) {
setupOlapTableSink(execPlan, deleteStatement, session);
} else {
throw new SemanticException("Unsupported table type for delete: " + table.getType());
}
// Configure pipeline for the sink
configurePipelineSink(execPlan, session, table, canUsePipeline);

Expand Down Expand Up @@ -184,6 +207,37 @@ private void setupOlapTableSink(ExecPlan execPlan, DeleteStmt deleteStatement, C
}
}

/**
* Sets up Iceberg delete sink for delete operations
*/
private void setupIcebergDeleteSink(ExecPlan execPlan, List<String> colNames,
IcebergTable icebergTable, ConnectContext session) {
DescriptorTable descriptorTable = execPlan.getDescTbl();
TupleDescriptor mergeTuple = descriptorTable.createTupleDescriptor();

List<Expr> outputExprs = execPlan.getOutputExprs();
Preconditions.checkArgument(colNames.size() == outputExprs.size(),
"output column size mismatch");
for (int index = 0; index < colNames.size(); ++index) {
SlotDescriptor slot = descriptorTable.addSlotDescriptor(mergeTuple);
slot.setIsMaterialized(true);
slot.setType(outputExprs.get(index).getType());
slot.setColumn(new Column(colNames.get(index), outputExprs.get(index).getType()));
slot.setIsNullable(outputExprs.get(index).isNullable());
}
mergeTuple.computeMemLayout();

// Initialize IcebergDeleteSink
descriptorTable.addReferencedTable(icebergTable);
IcebergDeleteSink dataSink = new IcebergDeleteSink(
icebergTable,
mergeTuple,
session.getSessionVariable()
);
dataSink.init();
execPlan.getFragments().get(0).setSink(dataSink);
}

/**
* Configures pipeline for sink fragment
*/
Expand All @@ -205,4 +259,46 @@ private void configurePipelineSink(ExecPlan execPlan, ConnectContext session,
execPlan.getFragments().get(0).setPipelineDop(1);
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong sink flag set for Iceberg delete operations

The configurePipelineSink method unconditionally calls sinkFragment.setHasOlapTableSink() for all table types, including Iceberg tables. For Iceberg delete operations, this should call setHasIcebergTableSink() instead. The InsertPlanner correctly distinguishes between table types and sets the appropriate flag (setHasIcebergTableSink() for Iceberg tables). This mismatch could cause incorrect behavior in downstream code that checks which sink type is present using hasIcebergTableSink() vs hasOlapTableSink().

Fix in Cursor Fix in Web

/**
*
* @param icebergTable The Iceberg table
* @param outputColumns Output columns from the logical plan (includes virtual columns + partition columns)
* @return PhysicalPropertySet with shuffle requirement or empty property
*/
private PhysicalPropertySet createShuffleProperty(IcebergTable icebergTable,
List<ColumnRefOperator> outputColumns) {
// Check if table is partitioned
if (!icebergTable.isPartitioned()) {
// No shuffle for non-partitioned tables
return new PhysicalPropertySet();
}

List<String> partitionColNames = icebergTable.getPartitionColumnNames();
List<Integer> partitionColumnIds = Lists.newArrayList();
for (String partCol : partitionColNames) {
for (ColumnRefOperator outputCol : outputColumns) {
if (outputCol.getName().equalsIgnoreCase(partCol)) {
partitionColumnIds.add(outputCol.getId());
break;
}
}
}

if (partitionColumnIds.isEmpty()) {
// Partition column not in output, cannot shuffle
return new PhysicalPropertySet();
}

// Create HASH distribution spec
HashDistributionDesc distributionDesc = new HashDistributionDesc(
partitionColumnIds,
HashDistributionDesc.SourceType.SHUFFLE_AGG
);

DistributionProperty distributionProperty = DistributionProperty.createProperty(
DistributionSpec.createHashDistributionSpec(distributionDesc));

return new PhysicalPropertySet(distributionProperty);
}
}
Loading
Loading