File tree Expand file tree Collapse file tree 2 files changed +16
-12
lines changed
paimon-core/src/main/java/org/apache/paimon/mergetree/compact Expand file tree Collapse file tree 2 files changed +16
-12
lines changed Original file line number Diff line number Diff line change @@ -106,7 +106,7 @@ public void add(KeyValue kv) {
106106 public ChangelogResult getResult () {
107107 // 1. Find the latest high level record and compute containLevel0
108108 KeyValue highLevel = mergeFunction .pickHighLevel ();
109- boolean containLevel0 = mergeFunction . containLevel0 ();
109+ boolean containLevel0 = containLevel0 ();
110110
111111 // 2. Lookup if latest high level record is absent
112112 if (highLevel == null ) {
@@ -138,6 +138,15 @@ public ChangelogResult getResult() {
138138 return reusedResult .setResult (result );
139139 }
140140
141+ public boolean containLevel0 () {
142+ for (KeyValue kv : mergeFunction .candidates ()) {
143+ if (kv .level () == 0 ) {
144+ return true ;
145+ }
146+ }
147+ return false ;
148+ }
149+
141150 private void insertInto (LinkedList <KeyValue > candidates , KeyValue highLevel ) {
142151 List <KeyValue > newCandidates = new ArrayList <>();
143152 for (KeyValue candidate : candidates ) {
Original file line number Diff line number Diff line change @@ -53,7 +53,9 @@ public void add(KeyValue kv) {
5353 public KeyValue pickHighLevel () {
5454 KeyValue highLevel = null ;
5555 for (KeyValue kv : candidates ) {
56- if (kv .level () == 0 ) {
56+ // records that has not been stored on the disk yet, such as the data in the write
57+ // buffer being at level -1
58+ if (kv .level () <= 0 ) {
5759 continue ;
5860 }
5961 // For high-level comparison logic (not involving Level 0), only the value of the
@@ -65,15 +67,6 @@ public KeyValue pickHighLevel() {
6567 return highLevel ;
6668 }
6769
68- public boolean containLevel0 () {
69- for (KeyValue kv : candidates ) {
70- if (kv .level () == 0 ) {
71- return true ;
72- }
73- }
74- return false ;
75- }
76-
7770 public InternalRow key () {
7871 return candidates .get (0 ).key ();
7972 }
@@ -87,7 +80,9 @@ public KeyValue getResult() {
8780 mergeFunction .reset ();
8881 KeyValue highLevel = pickHighLevel ();
8982 for (KeyValue kv : candidates ) {
90- if (kv .level () == 0 || kv == highLevel ) {
83+ // records that has not been stored on the disk yet, such as the data in the write
84+ // buffer being at level -1
85+ if (kv .level () <= 0 || kv == highLevel ) {
9186 mergeFunction .add (kv );
9287 }
9388 }
You can’t perform that action at this time.
0 commit comments