Skip to content

Commit 37ea4f3

Browse files
authored
[BugFix] Fix txn log not exist when batch publish for shared-data arch (backport StarRocks#60949) (StarRocks#60977)
Signed-off-by: PengFei Li <[email protected]>
1 parent 8f5e638 commit 37ea4f3

File tree

3 files changed

+186
-19
lines changed

3 files changed

+186
-19
lines changed

fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,9 @@ public boolean publishPartitionBatch(Database db, long tableId, PartitionPublish
472472
final List<Long> versions = publishVersionData.getCommitVersions();
473473
final List<TxnInfoPB> txnInfos = publishVersionData.getTxnInfos();
474474

475-
Map<Long, Set<Tablet>> shadowTabletsMap = new HashMap<>();
475+
// Record which transactions can be published in a batch for each shadow index.
476+
// The mapping is shadow index id -> ShadowIndexTxnBatch
477+
Map<Long, ShadowIndexTxnBatch> shadowIndexTxnBatches = null;
476478
Set<Tablet> normalTablets = null;
477479

478480
Locker locker = new Locker();
@@ -503,18 +505,22 @@ public boolean publishPartitionBatch(Database db, long tableId, PartitionPublish
503505
warehouseId = txnState.getWarehouseId();
504506
List<MaterializedIndex> indexes = txnState.getPartitionLoadedTblIndexes(table.getId(), partition);
505507
for (MaterializedIndex index : indexes) {
506-
if (!index.visibleForTransaction(txnState.getTransactionId())) {
507-
LOG.info("Ignored index {} for transaction {}", table.getIndexNameById(index.getId()),
508-
txnState.getTransactionId());
509-
continue;
510-
}
511508
if (index.getState() == MaterializedIndex.IndexState.SHADOW) {
512-
if (shadowTabletsMap.containsKey(versions.get(i))) {
513-
shadowTabletsMap.get(versions.get(i)).addAll(index.getTablets());
514-
} else {
515-
Set<Tablet> tabletsNew = new HashSet<>(index.getTablets());
516-
shadowTabletsMap.put(versions.get(i), tabletsNew);
509+
// sanity check. should not happen
510+
if (!index.visibleForTransaction(txnState.getTransactionId())) {
511+
LOG.warn("Ignore shadow index included in the transaction but not visible, " +
512+
"partitionId: {}, partitionName: {}, txnId: {}, indexId: {}, indexName: {}",
513+
partition.getId(), partition.getName(), txnState.getTransactionId(),
514+
index.getId(), table.getIndexNameById(index.getId()));
515+
continue;
516+
}
517+
if (shadowIndexTxnBatches == null) {
518+
shadowIndexTxnBatches = new HashMap<>();
517519
}
520+
ShadowIndexTxnBatch txnBatch =
521+
shadowIndexTxnBatches.computeIfAbsent(index.getId(),
522+
id -> new ShadowIndexTxnBatch(index.getTablets()));
523+
txnBatch.txnIds.add(txnState.getTransactionId());
518524
} else {
519525
normalTablets = (normalTablets == null) ? Sets.newHashSet() : normalTablets;
520526
normalTablets.addAll(index.getTablets());
@@ -529,13 +535,27 @@ public boolean publishPartitionBatch(Database db, long tableId, PartitionPublish
529535
long endVersion = versions.get(versions.size() - 1);
530536

531537
try {
532-
for (Map.Entry<Long, Set<Tablet>> item : shadowTabletsMap.entrySet()) {
533-
int index = versions.indexOf(item.getKey());
534-
List<Tablet> publishShadowTablets = new ArrayList<>(item.getValue());
535-
Utils.publishLogVersionBatch(publishShadowTablets,
536-
txnInfos.subList(index, txnInfos.size()),
537-
versions.subList(index, versions.size()),
538-
warehouseId);
538+
if (shadowIndexTxnBatches != null) {
539+
for (ShadowIndexTxnBatch txnBatch : shadowIndexTxnBatches.values()) {
540+
List<Tablet> shadowIndexTablets = txnBatch.tablets;
541+
if (shadowIndexTablets.isEmpty()) {
542+
continue;
543+
}
544+
List<TxnInfoPB> txnInfoList = txnInfos;
545+
List<Long> versionList = versions;
546+
if (txnBatch.txnIds.size() != transactionStates.size()) {
547+
txnInfoList = new ArrayList<>(txnBatch.txnIds.size());
548+
versionList = new ArrayList<>(txnBatch.txnIds.size());
549+
for (int i = 0; i < transactionStates.size(); i++) {
550+
TransactionState txnState = transactionStates.get(i);
551+
if (txnBatch.txnIds.contains(txnState.getTransactionId())) {
552+
txnInfoList.add(txnInfos.get(i));
553+
versionList.add(versions.get(i));
554+
}
555+
}
556+
}
557+
Utils.publishLogVersionBatch(shadowIndexTablets, txnInfoList, versionList, warehouseId);
558+
}
539559
}
540560
if (CollectionUtils.isNotEmpty(normalTablets)) {
541561
Map<Long, Double> compactionScores = new HashMap<>();
@@ -840,4 +860,16 @@ private boolean publishPartition(@NotNull Database db, @NotNull TableCommitInfo
840860
return false;
841861
}
842862
}
863+
864+
// Transactions that can be published in a batch for a partition of a shadow index
865+
private static class ShadowIndexTxnBatch {
866+
// the txn ids that include the shadow index
867+
Set<Long> txnIds = new HashSet<>();
868+
// the tablets in one partition of the shadow index
869+
List<Tablet> tablets = new ArrayList<>();
870+
871+
public ShadowIndexTxnBatch(List<Tablet> tablets) {
872+
this.tablets.addAll(tablets);
873+
}
874+
}
843875
}

fe/fe-core/src/test/java/com/starrocks/transaction/LakePublishBatchTest.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,34 @@
1616
package com.starrocks.transaction;
1717

1818
import com.google.common.collect.Lists;
19+
import com.starrocks.alter.AlterJobV2;
20+
import com.starrocks.alter.LakeTableSchemaChangeJob;
1921
import com.starrocks.catalog.Database;
2022
import com.starrocks.catalog.GlobalStateMgrTestUtil;
2123
import com.starrocks.catalog.MaterializedIndex;
24+
import com.starrocks.catalog.OlapTable;
2225
import com.starrocks.catalog.Partition;
26+
import com.starrocks.catalog.PhysicalPartition;
2327
import com.starrocks.catalog.Table;
2428
import com.starrocks.common.Config;
29+
import com.starrocks.common.util.UUIDUtil;
30+
import com.starrocks.lake.LakeTablet;
31+
import com.starrocks.proto.PublishLogVersionBatchRequest;
2532
import com.starrocks.qe.ConnectContext;
33+
import com.starrocks.rpc.BrpcProxy;
34+
import com.starrocks.rpc.LakeService;
2635
import com.starrocks.server.GlobalStateMgr;
2736
import com.starrocks.server.LocalMetastore;
2837
import com.starrocks.server.RunMode;
38+
import com.starrocks.server.WarehouseManager;
39+
import com.starrocks.sql.ast.AlterTableStmt;
40+
import com.starrocks.system.ComputeNode;
41+
import com.starrocks.utframe.MockedBackend;
2942
import com.starrocks.utframe.StarRocksAssert;
3043
import com.starrocks.utframe.UtFrameUtils;
3144
import mockit.Mock;
3245
import mockit.MockUp;
46+
import org.awaitility.Awaitility;
3347
import org.junit.jupiter.api.AfterAll;
3448
import org.junit.jupiter.api.Assertions;
3549
import org.junit.jupiter.api.BeforeAll;
@@ -38,24 +52,32 @@
3852
import java.util.List;
3953
import java.util.concurrent.TimeUnit;
4054

55+
import static org.junit.jupiter.api.Assertions.assertEquals;
56+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
57+
import static org.junit.jupiter.api.Assertions.assertNotNull;
58+
4159
public class LakePublishBatchTest {
4260
private static ConnectContext connectContext;
4361
private static StarRocksAssert starRocksAssert;
4462

4563
private static final String DB = "db_for_test";
4664
private static final String TABLE = "table_for_test";
65+
private static final String TABLE_SCHEMA_CHANGE = "table_for_test_schema_change";
4766
private TransactionState.TxnCoordinator transactionSource =
4867
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
4968

5069
private static boolean enable_batch_publish_version;
5170
private static int batch_publish_min_version_num;
71+
private static int alterSchedulerIntervalMs;
5272

5373
@BeforeAll
5474
public static void setUp() throws Exception {
5575
enable_batch_publish_version = Config.lake_enable_batch_publish_version;
5676
batch_publish_min_version_num = Config.lake_batch_publish_min_version_num;
77+
alterSchedulerIntervalMs = Config.alter_scheduler_interval_millisecond;
5778
Config.lake_enable_batch_publish_version = true;
5879
Config.lake_batch_publish_min_version_num = 2;
80+
Config.alter_scheduler_interval_millisecond = 100;
5981

6082
new MockUp<PublishVersionDaemon>() {
6183
@Mock
@@ -81,12 +103,18 @@ public void runOneCycle() {
81103
" PROPERTIES(\"replication_num\" = \"" + 3 +
82104
"\", \"storage_medium\" = \"SSD\")";
83105
starRocksAssert.withTable(sql);
106+
107+
String sql1 = "create table " + TABLE_SCHEMA_CHANGE +
108+
" (pk int NOT NULL, v0 int not null) primary KEY (pk) " +
109+
"DISTRIBUTED BY HASH(pk) BUCKETS 1;";
110+
starRocksAssert.withTable(sql1);
84111
}
85112

86113
@AfterAll
87114
public static void afterClass() {
88115
Config.lake_enable_batch_publish_version = enable_batch_publish_version;
89116
Config.lake_batch_publish_min_version_num = batch_publish_min_version_num;
117+
Config.alter_scheduler_interval_millisecond = alterSchedulerIntervalMs;
90118
}
91119

92120
@Test
@@ -369,4 +397,101 @@ public void testTransformBatchToSingle() throws Exception {
369397

370398
Config.lake_enable_batch_publish_version = true;
371399
}
400+
401+
@Test
402+
public void testBatchPublishShadowIndex() throws Exception {
403+
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB);
404+
Table table = GlobalStateMgr.getCurrentState().getLocalMetastore()
405+
.getTable(db.getFullName(), TABLE_SCHEMA_CHANGE);
406+
assertEquals(1, table.getPartitions().size());
407+
PhysicalPartition physicalPartition = table.getPartitions().iterator().next().getDefaultPhysicalPartition();
408+
List<MaterializedIndex> normalIndices =
409+
physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE);
410+
assertEquals(1, normalIndices.size());
411+
MaterializedIndex normalIndex = normalIndices.get(0);
412+
assertEquals(1, normalIndex.getTablets().size());
413+
LakeTablet normalTablet = (LakeTablet) normalIndex.getTablets().get(0);
414+
415+
GlobalTransactionMgr globalTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr();
416+
417+
// txn1 only includes tablets of base index
418+
long txn1 = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(table.getId()),
419+
"txn1" + "_" + UUIDUtil.genUUID().toString(), transactionSource,
420+
TransactionState.LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
421+
TransactionState txnState1 = globalTransactionMgr.getTransactionState(db.getId(), txn1);
422+
txnState1.addTableIndexes((OlapTable) table);
423+
List<TabletCommitInfo> commitInfo1 = commitAllTablets(List.of(normalTablet));
424+
425+
// do a schema change, which will create a shadow index
426+
String alterSql = String.format("alter table %s add index idx (v0) using bitmap", TABLE_SCHEMA_CHANGE);
427+
AlterTableStmt stmt = (AlterTableStmt) UtFrameUtils.parseStmtWithNewParser(alterSql, connectContext);
428+
GlobalStateMgr.getCurrentState().getLocalMetastore().alterTable(connectContext, stmt);
429+
List<AlterJobV2> alterJobs = GlobalStateMgr.getCurrentState().getAlterJobMgr()
430+
.getSchemaChangeHandler().getUnfinishedAlterJobV2ByTableId(table.getId());
431+
assertEquals(1, alterJobs.size());
432+
assertInstanceOf(LakeTableSchemaChangeJob.class, alterJobs.get(0));
433+
LakeTableSchemaChangeJob schemaChangeJob = (LakeTableSchemaChangeJob) alterJobs.get(0);
434+
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(
435+
() -> schemaChangeJob.getJobState() == AlterJobV2.JobState.WAITING_TXN);
436+
437+
List<MaterializedIndex> shadowIndices =
438+
physicalPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.SHADOW);
439+
assertEquals(1, shadowIndices.size());
440+
MaterializedIndex shadowIndex = shadowIndices.get(0);
441+
assertEquals(1, shadowIndex.getTablets().size());
442+
LakeTablet shadowTablet = (LakeTablet) shadowIndex.getTablets().get(0);
443+
444+
// txn2 includes tablets of both base index and shadow index
445+
long txn2 = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(table.getId()),
446+
"txn2" + "_" + UUIDUtil.genUUID().toString(), transactionSource,
447+
TransactionState.LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
448+
TransactionState txnState2 = globalTransactionMgr.getTransactionState(db.getId(), txn2);
449+
txnState2.addTableIndexes((OlapTable) table);
450+
List<TabletCommitInfo> commitInfo2 = commitAllTablets(List.of(normalTablet, shadowTablet));
451+
452+
// txn3 includes tablets of both base index and shadow index
453+
long txn3 = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(table.getId()),
454+
"txn3" + "_" + UUIDUtil.genUUID().toString(), transactionSource,
455+
TransactionState.LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
456+
TransactionState txnState3 = globalTransactionMgr.getTransactionState(db.getId(), txn3);
457+
txnState3.addTableIndexes((OlapTable) table);
458+
List<TabletCommitInfo> commitInfo3 = commitAllTablets(List.of(normalTablet, shadowTablet));
459+
460+
// commit in the order of txn2, tnx1, and txn3
461+
VisibleStateWaiter waiter2 = globalTransactionMgr.commitTransaction(db.getId(), txn2, commitInfo2,
462+
Lists.newArrayList(), null);
463+
VisibleStateWaiter waiter1 = globalTransactionMgr.commitTransaction(db.getId(), txn1, commitInfo1,
464+
Lists.newArrayList(), null);
465+
VisibleStateWaiter waiter3 = globalTransactionMgr.commitTransaction(db.getId(), txn3, commitInfo3,
466+
Lists.newArrayList(), null);
467+
468+
PublishVersionDaemon publishVersionDaemon = new PublishVersionDaemon();
469+
publishVersionDaemon.runAfterCatalogReady();
470+
471+
Assertions.assertTrue(waiter1.await(1, TimeUnit.MINUTES));
472+
Assertions.assertTrue(waiter2.await(1, TimeUnit.MINUTES));
473+
Assertions.assertTrue(waiter3.await(1, TimeUnit.MINUTES));
474+
475+
ComputeNode shadowTabletNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
476+
.getComputeNodeAssignedToTablet(WarehouseManager.DEFAULT_WAREHOUSE_ID, shadowTablet);
477+
LakeService lakeService = BrpcProxy.getLakeService(shadowTabletNode.getHost(), shadowTabletNode.getBrpcPort());
478+
assertInstanceOf(MockedBackend.MockLakeService.class, lakeService);
479+
MockedBackend.MockLakeService mockLakeService = (MockedBackend.MockLakeService) lakeService;
480+
PublishLogVersionBatchRequest request = mockLakeService.pollPublishLogVersionBatchRequests();
481+
assertNotNull(request);
482+
assertEquals(List.of(shadowTablet.getId()), request.getTabletIds());
483+
assertEquals(2, request.getTxnInfos().size());
484+
assertEquals(txn2, request.getTxnInfos().get(0).getTxnId());
485+
assertEquals(txn3, request.getTxnInfos().get(1).getTxnId());
486+
}
487+
488+
private List<TabletCommitInfo> commitAllTablets(List<LakeTablet> tablets) {
489+
List<TabletCommitInfo> commitInfos = Lists.newArrayList();
490+
List<Long> backends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendIds();
491+
for (LakeTablet tablet : tablets) {
492+
TabletCommitInfo tabletCommitInfo = new TabletCommitInfo(tablet.getId(), backends.get(0));
493+
commitInfos.add(tabletCommitInfo);
494+
}
495+
return commitInfos;
496+
}
372497
}

fe/fe-core/src/test/java/com/starrocks/utframe/MockedBackend.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@
141141
import java.util.concurrent.BlockingQueue;
142142
import java.util.concurrent.Callable;
143143
import java.util.concurrent.CompletableFuture;
144+
import java.util.concurrent.ConcurrentLinkedQueue;
144145
import java.util.concurrent.ExecutorService;
145146
import java.util.concurrent.Executors;
146147
import java.util.concurrent.Future;
@@ -551,7 +552,11 @@ public Future<PUpdateTransactionStateResponse> updateTransactionState(PUpdateTra
551552
}
552553
}
553554

554-
private static class MockLakeService implements LakeService {
555+
public static class MockLakeService implements LakeService {
556+
557+
private final ConcurrentLinkedQueue<PublishLogVersionBatchRequest> publishLogVersionBatchRequests =
558+
new ConcurrentLinkedQueue<>();
559+
555560
@Override
556561
public Future<PublishVersionResponse> publishVersion(PublishVersionRequest request) {
557562
return CompletableFuture.completedFuture(null);
@@ -599,9 +604,14 @@ public Future<PublishLogVersionResponse> publishLogVersion(PublishLogVersionRequ
599604

600605
@Override
601606
public Future<PublishLogVersionResponse> publishLogVersionBatch(PublishLogVersionBatchRequest request) {
607+
publishLogVersionBatchRequests.add(request);
602608
return CompletableFuture.completedFuture(null);
603609
}
604610

611+
public PublishLogVersionBatchRequest pollPublishLogVersionBatchRequests() {
612+
return publishLogVersionBatchRequests.poll();
613+
}
614+
605615
@Override
606616
public Future<LockTabletMetadataResponse> lockTabletMetadata(LockTabletMetadataRequest request) {
607617
return CompletableFuture.completedFuture(null);

0 commit comments

Comments
 (0)