1616
1717package com .alibaba .fluss .flink .tiering .committer ;
1818
19+ import com .alibaba .fluss .client .Connection ;
20+ import com .alibaba .fluss .client .ConnectionFactory ;
21+ import com .alibaba .fluss .client .admin .Admin ;
1922import com .alibaba .fluss .config .Configuration ;
20- import com .alibaba .fluss .flink .tiering .event .FinishTieringEvent ;
23+ import com .alibaba .fluss .exception .LakeTableSnapshotNotExistException ;
24+ import com .alibaba .fluss .flink .tiering .event .FailedTieringEvent ;
25+ import com .alibaba .fluss .flink .tiering .event .FinishedTieringEvent ;
2126import com .alibaba .fluss .flink .tiering .source .TableBucketWriteResult ;
2227import com .alibaba .fluss .flink .tiering .source .TieringSource ;
28+ import com .alibaba .fluss .lake .committer .CommittedLakeSnapshot ;
2329import com .alibaba .fluss .lake .committer .LakeCommitter ;
2430import com .alibaba .fluss .lake .writer .LakeTieringFactory ;
2531import com .alibaba .fluss .lake .writer .LakeWriter ;
32+ import com .alibaba .fluss .metadata .PartitionInfo ;
2633import com .alibaba .fluss .metadata .TableBucket ;
34+ import com .alibaba .fluss .metadata .TableInfo ;
2735import com .alibaba .fluss .metadata .TablePath ;
36+ import com .alibaba .fluss .utils .ExceptionUtils ;
2837
2938import org .apache .flink .runtime .operators .coordination .OperatorEventGateway ;
3039import org .apache .flink .runtime .source .event .SourceEventWrapper ;
@@ -67,8 +76,11 @@ public class TieringCommitOperator<WriteResult, Committable>
6776
6877 private static final long serialVersionUID = 1L ;
6978
79+ private final Configuration flussConfig ;
7080 private final LakeTieringFactory <WriteResult , Committable > lakeTieringFactory ;
7181 private final FlussTableLakeSnapshotCommitter flussTableLakeSnapshotCommitter ;
82+ private Connection connection ;
83+ private Admin admin ;
7284
7385 // gateway to send event to flink source coordinator
7486 private final OperatorEventGateway operatorEventGateway ;
@@ -84,6 +96,7 @@ public TieringCommitOperator(
8496 this .lakeTieringFactory = lakeTieringFactory ;
8597 this .flussTableLakeSnapshotCommitter = new FlussTableLakeSnapshotCommitter (flussConf );
8698 this .collectedTableBucketWriteResults = new HashMap <>();
99+ this .flussConfig = flussConf ;
87100 this .setup (
88101 parameters .getContainingTask (),
89102 parameters .getStreamConfig (),
@@ -97,6 +110,8 @@ public TieringCommitOperator(
97110 @ Override
98111 public void open () {
99112 flussTableLakeSnapshotCommitter .open ();
113+ connection = ConnectionFactory .createConnection (flussConfig );
114+ admin = connection .getAdmin ();
100115 }
101116
102117 @ Override
@@ -112,16 +127,27 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
112127 collectTableAllBucketWriteResult (tableId );
113128
114129 if (committableWriteResults != null ) {
115- Committable committable =
116- commitWriteResults (
117- tableId , tableBucketWriteResult .tablePath (), committableWriteResults );
118- collectedTableBucketWriteResults .remove (tableId );
119- // notify that the table id has been finished tier
120- operatorEventGateway .sendEventToCoordinator (
121- new SourceEventWrapper (new FinishTieringEvent (tableId )));
122- // only emit when committable is not-null
123- if (committable != null ) {
124- output .collect (new StreamRecord <>(new CommittableMessage <>(committable )));
130+ try {
131+ Committable committable =
132+ commitWriteResults (
133+ tableId ,
134+ tableBucketWriteResult .tablePath (),
135+ committableWriteResults );
136+ // only emit when committable is not-null
137+ if (committable != null ) {
138+ output .collect (new StreamRecord <>(new CommittableMessage <>(committable )));
139+ }
140+ // notify that the table id has been finished tier
141+ operatorEventGateway .sendEventToCoordinator (
142+ new SourceEventWrapper (new FinishedTieringEvent (tableId )));
143+ } catch (Exception e ) {
144+ // if any exception happens, send to source coordinator to mark it as failed
145+ operatorEventGateway .sendEventToCoordinator (
146+ new SourceEventWrapper (
147+ new FailedTieringEvent (
148+ tableId , ExceptionUtils .stringifyException (e ))));
149+ } finally {
150+ collectedTableBucketWriteResults .remove (tableId );
125151 }
126152 }
127153 }
@@ -154,6 +180,8 @@ private Committable commitWriteResults(
154180 .collect (Collectors .toList ());
155181 // to committable
156182 Committable committable = lakeCommitter .toCommitable (writeResults );
183+ // before commit to lake, check fluss not missing any lake snapshot commited by fluss
184+ checkFlussNotMissingLakeSnapshot (tablePath , lakeCommitter , committable );
157185 long commitedSnapshotId = lakeCommitter .commit (committable );
158186 // commit to fluss
159187 Map <TableBucket , Long > logEndOffsets = new HashMap <>();
@@ -166,6 +194,61 @@ private Committable commitWriteResults(
166194 }
167195 }
168196
197+ private void checkFlussNotMissingLakeSnapshot (
198+ TablePath tablePath ,
199+ LakeCommitter <WriteResult , Committable > lakeCommitter ,
200+ Committable committable )
201+ throws Exception {
202+ Long flussCurrentLakeSnapshot ;
203+ try {
204+ flussCurrentLakeSnapshot = admin .getLatestLakeSnapshot (tablePath ).get ().getSnapshotId ();
205+ } catch (Exception e ) {
206+ Throwable throwable = e .getCause ();
207+ if (throwable instanceof LakeTableSnapshotNotExistException ) {
208+ // do-nothing
209+ flussCurrentLakeSnapshot = null ;
210+ } else {
211+ throw e ;
212+ }
213+ }
214+
215+ // get Fluss missing lake snapshot in Lake
216+ CommittedLakeSnapshot missingCommittedSnapshot =
217+ lakeCommitter .getMissingLakeSnapshot (flussCurrentLakeSnapshot );
218+
219+ // fluss's known snapshot is less than lake snapshot committed by fluss
220+ // fail this commit since the data is read from the log end-offset of a invalid fluss
221+ // known lake snapshot, which means the data already has been committed to lake,
222+ // not to commit to lake to avoid data duplicated
223+ if (missingCommittedSnapshot != null ) {
224+ // commit this missing snapshot to fluss
225+ TableInfo tableInfo = admin .getTableInfo (tablePath ).get ();
226+ Map <String , Long > partitionIdByName = null ;
227+ if (tableInfo .isPartitioned ()) {
228+ partitionIdByName =
229+ admin .listPartitionInfos (tablePath ).get ().stream ()
230+ .collect (
231+ Collectors .toMap (
232+ PartitionInfo ::getPartitionName ,
233+ PartitionInfo ::getPartitionId ));
234+ }
235+ flussTableLakeSnapshotCommitter .commit (
236+ tableInfo .getTableId (), partitionIdByName , missingCommittedSnapshot );
237+ // abort this committable to delete the written files
238+ lakeCommitter .abort (committable );
239+ throw new IllegalStateException (
240+ String .format (
241+ "The current Fluss's lake snapshot %d is less than"
242+ + " lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d},"
243+ + " missing snapshot: %s." ,
244+ flussCurrentLakeSnapshot ,
245+ missingCommittedSnapshot .getLakeSnapshotId (),
246+ tableInfo .getTablePath (),
247+ tableInfo .getTableId (),
248+ missingCommittedSnapshot ));
249+ }
250+ }
251+
169252 private void registerTableBucketWriteResult (
170253 long tableId , TableBucketWriteResult <WriteResult > tableBucketWriteResult ) {
171254 collectedTableBucketWriteResults
@@ -214,5 +297,11 @@ private List<TableBucketWriteResult<WriteResult>> collectTableAllBucketWriteResu
214297 @ Override
215298 public void close () throws Exception {
216299 flussTableLakeSnapshotCommitter .close ();
300+ if (admin != null ) {
301+ admin .close ();
302+ }
303+ if (connection != null ) {
304+ connection .close ();
305+ }
217306 }
218307}
0 commit comments