File tree Expand file tree Collapse file tree 5 files changed +30
-0
lines changed
paimon-core/src/main/java/org/apache/paimon/table
paimon-flink/paimon-flink-common/src
main/java/org/apache/paimon/flink/sink/coordinator
test/java/org/apache/paimon/flink/sink/coordinator Expand file tree Collapse file tree 5 files changed +30
-0
lines changed Original file line number Diff line number Diff line change @@ -121,6 +121,12 @@ public void setManifestCache(SegmentsCache<Path> manifestCache) {
121121 store ().setManifestCache (manifestCache );
122122 }
123123
124+ @ Nullable
125+ @ Override
126+ public SegmentsCache <Path > getManifestCache () {
127+ return manifestCache ;
128+ }
129+
124130 @ Override
125131 public void setSnapshotCache (Cache <Path , Snapshot > cache ) {
126132 this .snapshotCache = cache ;
Original file line number Diff line number Diff line change @@ -138,6 +138,12 @@ public void setManifestCache(SegmentsCache<Path> manifestCache) {
138138 wrapped .setManifestCache (manifestCache );
139139 }
140140
141+ @ Nullable
142+ @ Override
143+ public SegmentsCache <Path > getManifestCache () {
144+ return wrapped .getManifestCache ();
145+ }
146+
141147 @ Override
142148 public void setSnapshotCache (Cache <Path , Snapshot > cache ) {
143149 wrapped .setSnapshotCache (cache );
Original file line number Diff line number Diff line change @@ -56,6 +56,9 @@ public interface FileStoreTable extends DataTable {
5656
5757 void setManifestCache (SegmentsCache <Path > manifestCache );
5858
59+ @ Nullable
60+ SegmentsCache <Path > getManifestCache ();
61+
5962 void setSnapshotCache (Cache <Path , Snapshot > cache );
6063
6164 void setStatsCache (Cache <String , Statistics > cache );
Original file line number Diff line number Diff line change 3636import java .util .concurrent .ConcurrentHashMap ;
3737
3838import static org .apache .paimon .deletionvectors .DeletionVectorsIndexFile .DELETION_VECTORS_INDEX ;
39+ import static org .apache .paimon .utils .Preconditions .checkNotNull ;
3940import static org .apache .paimon .utils .SerializationUtils .deserializeBinaryRow ;
4041
4142/**
@@ -53,6 +54,7 @@ public class TableWriteCoordinator {
5354
5455 public TableWriteCoordinator (FileStoreTable table ) {
5556 this .table = table ;
57+ checkNotNull (table .getManifestCache ());
5658 this .latestCommittedIdentifiers = new ConcurrentHashMap <>();
5759 this .scan = table .store ().newScan ();
5860 if (table .coreOptions ().manifestDeleteFileDropStats ()) {
Original file line number Diff line number Diff line change 2626import org .apache .paimon .table .TableTestBase ;
2727import org .apache .paimon .types .DataTypes ;
2828
29+ import org .junit .jupiter .api .Test ;
2930import org .junit .jupiter .params .ParameterizedTest ;
3031import org .junit .jupiter .params .provider .ValueSource ;
3132
3233import static org .apache .paimon .data .BinaryRow .EMPTY_ROW ;
3334import static org .apache .paimon .utils .SerializationUtils .serializeBinaryRow ;
3435import static org .assertj .core .api .Assertions .assertThat ;
36+ import static org .assertj .core .api .Assertions .assertThatThrownBy ;
3537
3638class TableWriteCoordinatorTest extends TableTestBase {
3739
@@ -63,4 +65,15 @@ public void testLatestIdentifierAndScan(boolean initSnapshot) throws Exception {
6365 assertThat (scan .snapshot ().id ()).isEqualTo (latest .id ());
6466 assertThat (scan .extractDataFiles ().size ()).isEqualTo (initSnapshot ? 2 : 1 );
6567 }
68+
69+ @ Test
70+ public void testNoManifestCache () throws Exception {
71+ Identifier identifier = new Identifier ("db" , "table" );
72+ catalog .createDatabase ("db" , false );
73+ createTable (identifier );
74+ FileStoreTable table = getTable (identifier );
75+ table .setManifestCache (null );
76+ assertThatThrownBy (() -> new TableWriteCoordinator (table ))
77+ .isInstanceOf (NullPointerException .class );
78+ }
6679}
You can’t perform that action at this time.
0 commit comments