Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,37 @@
import java.util.Set;
import java.util.stream.Collectors;

/**
* Schema Change Job implementation for the storage-compute separation (cloud) architecture.
*
* <p>In storage-compute separation mode, tablet metadata is managed by the MetaService rather
* than directly on BE local disk. This class overrides the key lifecycle methods of
* {@link SchemaChangeJobV2} to interact with the MetaService via RPC calls through
* {@link CloudInternalCatalog} instead of sending tasks directly to BEs.
*
* <p>The index creation flow in storage-compute separation mode:
* <ol>
* <li>{@link #createShadowIndexReplica()} - calls MetaService to prepare and create shadow
* tablets for each partition via {@code prepareMaterializedIndex} and
* {@code sendCreateTabletsRpc}.</li>
* <li>BE executes ALTER tasks to physically rewrite data files with the new index.</li>
* <li>{@link #commitShadowIndex()} - calls MetaService {@code commitMaterializedIndex}
* to atomically promote the shadow index to a visible index.</li>
* <li>{@link #postProcessOriginIndex()} - calls MetaService {@code dropMaterializedIndex}
* to drop the old index and free cloud storage space.</li>
* </ol>
*
* <p>On cancellation, {@link #onCancel()} calls MetaService to remove the shadow index
* and clean up any partially created SchemaChangeJob records.
*/
public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 {
private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class);

/**
* Creates a new CloudSchemaChangeJobV2 and binds it to the current compute group (cloud cluster).
* The compute group name is captured from {@link ConnectContext} at creation time and used later
* by {@link #ensureCloudClusterExist(List)} to verify that the cluster is still available.
*/
public CloudSchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId,
String tableName, long timeoutMs) {
super(rawSql, jobId, dbId, tableId, tableName, timeoutMs);
Expand All @@ -80,6 +108,9 @@ private CloudSchemaChangeJobV2() {}

@Override
protected void commitShadowIndex() throws AlterCancelException {
// In storage-compute separation mode, the shadow index promotion is done by notifying
// MetaService via commitMaterializedIndex RPC. MetaService atomically switches the
// shadow index to a visible (committed) state so that subsequent queries can use it.
List<Long> shadowIdxList =
indexIdMap.keySet().stream().collect(Collectors.toList());
try {
Expand All @@ -100,6 +131,10 @@ protected void onCancel() {
return;
}

// In storage-compute separation mode, cancellation requires two steps:
// 1. Drop the shadow index tablets from MetaService (dropMaterializedIndex RPC).
// 2. Remove each SchemaChangeJob record from MetaService for every
// (partition, originTablet, shadowTablet) combination (removeSchemaChangeJob RPC).
List<Long> shadowIdxList = indexIdMap.keySet().stream().collect(Collectors.toList());
dropIndex(shadowIdxList);

Expand Down Expand Up @@ -141,10 +176,16 @@ protected void postProcessOriginIndex() {
return;
}

// After the shadow index has been committed, drop the original index from MetaService
// to free up cloud storage space occupied by the old index data.
List<Long> originIdxList = indexIdMap.values().stream().collect(Collectors.toList());
dropIndex(originIdxList);
}

/**
* Drops the given index list from MetaService with retry logic.
* Used for both cancellation (dropping shadow indexes) and post-processing (dropping origin indexes).
*/
private void dropIndex(List<Long> idxList) {
int tryTimes = 1;
while (true) {
Expand All @@ -164,6 +205,21 @@ private void dropIndex(List<Long> idxList) {
dbId, tableId, jobId, idxList);
}

/**
* Creates shadow index replicas in storage-compute separation mode.
*
* <p>Unlike the local mode which directly creates tablet replicas on BE nodes,
* this method:
* <ol>
* <li>Calls {@code prepareMaterializedIndex} RPC to reserve the shadow index slot
* in MetaService with an expiration timestamp.</li>
* <li>Builds {@code TabletMetaCloudPB} for each shadow tablet in each partition and
* sends them to MetaService via {@code sendCreateTabletsRpc} to persist the
* tablet metadata in cloud storage.</li>
* <li>Adds the shadow indexes to the FE in-memory catalog so that BE nodes can
* discover them when processing the ALTER tasks.</li>
* </ol>
*/
@Override
protected void createShadowIndexReplica() throws AlterCancelException {
Database db = Env.getCurrentInternalCatalog()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@
import java.util.Set;
import java.util.stream.Collectors;

/**
* Schema Change handler for the storage-compute separation (cloud) architecture.
*
* <p>This class extends {@link SchemaChangeHandler} and overrides methods that need
* to interact with cloud infrastructure (MetaService) rather than managing tablet
* replicas directly on BE local disks.
*
* <p>In storage-compute separation mode, tablet metadata (including index schema) is
* managed by the MetaService. When creating an index (e.g. {@code ALTER TABLE ... ADD INDEX}),
* the handler delegates to {@link CloudSchemaChangeJobV2} which communicates with
* MetaService via {@link org.apache.doris.cloud.datasource.CloudInternalCatalog} RPCs.
*
* <p>This handler is activated when {@code Config.isCloudMode()} returns {@code true}.
* It is instantiated by {@link org.apache.doris.alter.Alter} at startup.
*
* @see CloudSchemaChangeJobV2
* @see org.apache.doris.alter.SchemaChangeHandler
*/
public class CloudSchemaChangeHandler extends SchemaChangeHandler {
private static final Logger LOG = LogManager.getLogger(CloudSchemaChangeHandler.class);

Expand Down
Loading