4545import com .alibaba .fluss .rpc .protocol .ApiError ;
4646import com .alibaba .fluss .server .coordinator .event .AccessContextEvent ;
4747import com .alibaba .fluss .server .coordinator .event .AdjustIsrReceivedEvent ;
48+ import com .alibaba .fluss .server .coordinator .event .AutoPreferredReplicaLeaderElection ;
4849import com .alibaba .fluss .server .coordinator .event .CommitKvSnapshotEvent ;
4950import com .alibaba .fluss .server .coordinator .event .CommitLakeTableSnapshotEvent ;
5051import com .alibaba .fluss .server .coordinator .event .CommitRemoteLogManifestEvent ;
8788import com .alibaba .fluss .server .zk .data .TabletServerRegistration ;
8889import com .alibaba .fluss .server .zk .data .ZkData .PartitionIdsZNode ;
8990import com .alibaba .fluss .server .zk .data .ZkData .TableIdsZNode ;
91+ import com .alibaba .fluss .utils .concurrent .Scheduler ;
9092import com .alibaba .fluss .utils .types .Tuple2 ;
9193
9294import org .slf4j .Logger ;
9799
98100import java .util .ArrayList ;
99101import java .util .Collections ;
102+ import java .util .HashMap ;
100103import java .util .HashSet ;
101104import java .util .List ;
102105import java .util .Map ;
108111
109112import static com .alibaba .fluss .server .coordinator .statemachine .BucketState .OfflineBucket ;
110113import static com .alibaba .fluss .server .coordinator .statemachine .BucketState .OnlineBucket ;
114+ import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaLeaderElectionAlgorithms .preferredReplicaLeaderElection ;
111115import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaLeaderElectionStrategy .CONTROLLED_SHUTDOWN_ELECTION ;
116+ import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaLeaderElectionStrategy .PREFERRED_LEADER_ELECTION ;
112117import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaState .OfflineReplica ;
113118import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaState .OnlineReplica ;
114119import static com .alibaba .fluss .server .coordinator .statemachine .ReplicaState .ReplicaDeletionStarted ;
@@ -138,9 +143,12 @@ public class CoordinatorEventProcessor implements EventProcessor {
138143 private final CoordinatorRequestBatch coordinatorRequestBatch ;
139144 private final CoordinatorMetricGroup coordinatorMetricGroup ;
140145 private final String internalListenerName ;
146+ private final Configuration conf ;
141147
142148 private final CompletedSnapshotStoreManager completedSnapshotStoreManager ;
143149
150+ private final Scheduler scheduler ;
151+
144152 // metrics
145153 private volatile int tabletServerCount ;
146154 private volatile int offlineBucketCount ;
@@ -157,7 +165,8 @@ public CoordinatorEventProcessor(
157165 LakeTableTieringManager lakeTableTieringManager ,
158166 CoordinatorMetricGroup coordinatorMetricGroup ,
159167 Configuration conf ,
160- ExecutorService ioExecutor ) {
168+ ExecutorService ioExecutor ,
169+ Scheduler scheduler ) {
161170 this .zooKeeperClient = zooKeeperClient ;
162171 this .serverMetadataCache = serverMetadataCache ;
163172 this .coordinatorChannelManager = coordinatorChannelManager ;
@@ -202,7 +211,9 @@ public CoordinatorEventProcessor(
202211 this .autoPartitionManager = autoPartitionManager ;
203212 this .lakeTableTieringManager = lakeTableTieringManager ;
204213 this .coordinatorMetricGroup = coordinatorMetricGroup ;
214+ this .conf = conf ;
205215 this .internalListenerName = conf .getString (ConfigOptions .INTERNAL_LISTENER_NAME );
216+ this .scheduler = scheduler ;
206217 registerMetrics ();
207218 }
208219
@@ -250,6 +261,10 @@ public void startup() {
250261
251262 // start the event manager which will then process the event
252263 coordinatorEventManager .start ();
264+
265+ if (conf .getBoolean (ConfigOptions .AUTO_LEADER_REBALANCE_ENABLE )) {
266+ scheduleAutoLeaderRebalanceTask (5000 );
267+ }
253268 }
254269
255270 public void shutdown () {
@@ -283,6 +298,13 @@ private ServerInfo getCoordinatorServerInfo() {
283298 }
284299 }
285300
301+ private void scheduleAutoLeaderRebalanceTask (long delayMs ) {
302+ scheduler .scheduleOnce (
303+ "auto-leader-rebalance-task" ,
304+ () -> coordinatorEventManager .put (new AutoPreferredReplicaLeaderElection ()),
305+ delayMs );
306+ }
307+
286308 public int getCoordinatorEpoch () {
287309 return coordinatorContext .getCoordinatorEpoch ();
288310 }
@@ -505,6 +527,8 @@ public void process(CoordinatorEvent event) {
505527 } else if (event instanceof AccessContextEvent ) {
506528 AccessContextEvent <?> accessContextEvent = (AccessContextEvent <?>) event ;
507529 processAccessContext (accessContextEvent );
530+ } else if (event instanceof AutoPreferredReplicaLeaderElection ) {
531+ processAutoPreferredReplicaLeaderElection ();
508532 } else {
509533 LOG .warn ("Unknown event type: {}" , event .getClass ().getName ());
510534 }
@@ -1053,6 +1077,88 @@ private <T> void processAccessContext(AccessContextEvent<T> event) {
10531077 }
10541078 }
10551079
1080+ private void processAutoPreferredReplicaLeaderElection () {
1081+ try {
1082+ LOG .info ("Processing automatic preferred replica leader election" );
1083+ checkAndTriggerAutoLeaderRebalance ();
1084+ } finally {
1085+ scheduleAutoLeaderRebalanceTask (
1086+ conf .getInt (ConfigOptions .LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS ) * 1000L );
1087+ }
1088+ }
1089+
1090+ private void checkAndTriggerAutoLeaderRebalance () {
1091+ LOG .trace ("Checking need to trigger auto leader balancing" );
1092+ Map <Integer , Map <TableBucket , List <Integer >>> preferredReplicasForTopicsByTabletServers =
1093+ new HashMap <>();
1094+ coordinatorContext .allBuckets ().stream ()
1095+ .filter (tb -> !coordinatorContext .isToBeDeleted (tb ))
1096+ .collect (Collectors .toMap (tb -> tb , coordinatorContext ::getAssignment ))
1097+ .forEach (
1098+ (tb , assignment ) ->
1099+ preferredReplicasForTopicsByTabletServers
1100+ .computeIfAbsent (assignment .get (0 ), k -> new HashMap <>())
1101+ .put (tb , assignment ));
1102+
1103+ // for each tablet server, check if a preferred replica election needs to be triggered
1104+ for (Map .Entry <Integer , Map <TableBucket , List <Integer >>> entry :
1105+ preferredReplicasForTopicsByTabletServers .entrySet ()) {
1106+ int leader = entry .getKey ();
1107+ Set <TableBucket > tableBucketsNotInPreferredReplica = new HashSet <>();
1108+ for (TableBucket tableBucket : entry .getValue ().keySet ()) {
1109+ Optional <LeaderAndIsr > leaderAndIsrOp =
1110+ coordinatorContext .getBucketLeaderAndIsr (tableBucket );
1111+ leaderAndIsrOp
1112+ .filter (leaderAndIsr -> leaderAndIsr .leader () != leader )
1113+ .ifPresent (
1114+ leaderAndIsr -> tableBucketsNotInPreferredReplica .add (tableBucket ));
1115+ }
1116+ LOG .debug (
1117+ "Table buckets not in preferred replica for tablet server {} {}" ,
1118+ leader ,
1119+ tableBucketsNotInPreferredReplica );
1120+
1121+ double imbalanceRatio =
1122+ (double ) tableBucketsNotInPreferredReplica .size () / entry .getValue ().size ();
1123+ LOG .trace ("Leader imbalance ratio for tablet server {} is {}" , leader , imbalanceRatio );
1124+
1125+ // check ratio and if greater than desired ratio, trigger a rebalance for the table
1126+ // buckets
1127+ // that need to be on this tablet server
1128+ if (imbalanceRatio
1129+ > ((double )
1130+ conf .getInt (
1131+ ConfigOptions
1132+ .LEADER_IMBALANCE_PER_TABLET_SERVER_PERCENTAGE )
1133+ / 100 )) {
1134+ // do this check only if the tablet server is live and preferred replica election is
1135+ // not in progress
1136+ Set <TableBucket > candidateTableBuckets =
1137+ tableBucketsNotInPreferredReplica .stream ()
1138+ .filter (
1139+ tb ->
1140+ !coordinatorContext .isToBeDeleted (tb )
1141+ && coordinatorContext
1142+ .allBuckets ()
1143+ .contains (tb )
1144+ && canPreferredReplicaBeLeader (tb ))
1145+ .collect (Collectors .toSet ());
1146+ tableBucketStateMachine .handleStateChange (
1147+ candidateTableBuckets , OnlineBucket , PREFERRED_LEADER_ELECTION );
1148+ }
1149+ }
1150+ }
1151+
1152+ private boolean canPreferredReplicaBeLeader (TableBucket tableBucket ) {
1153+ List <Integer > assignment = coordinatorContext .getAssignment (tableBucket );
1154+ List <Integer > liveReplicas =
1155+ assignment .stream ()
1156+ .filter (replica -> coordinatorContext .isReplicaOnline (replica , tableBucket ))
1157+ .collect (Collectors .toList ());
1158+ List <Integer > isr = coordinatorContext .getBucketLeaderAndIsr (tableBucket ).get ().isr ();
1159+ return preferredReplicaLeaderElection (assignment , liveReplicas , isr ).isPresent ();
1160+ }
1161+
10561162 private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot (
10571163 CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent ) {
10581164 CommitLakeTableSnapshotData commitLakeTableSnapshotData =
0 commit comments