3030
3131import javax .annotation .Nullable ;
3232
33+ import java .util .ArrayList ;
3334import java .util .Comparator ;
34- import java .util .Iterator ;
3535import java .util .LinkedList ;
3636import java .util .List ;
3737import java .util .function .Function ;
5656public class LookupChangelogMergeFunctionWrapper <T >
5757 implements MergeFunctionWrapper <ChangelogResult > {
5858
59- private final MergeFunction < KeyValue > mergeFunction ;
59+ private final LookupMergeFunction mergeFunction ;
6060 private final Function <InternalRow , T > lookup ;
6161
6262 private final ChangelogResult reusedResult = new ChangelogResult ();
6363 private final KeyValue reusedBefore = new KeyValue ();
6464 private final KeyValue reusedAfter = new KeyValue ();
6565 @ Nullable private final RecordEqualiser valueEqualiser ;
6666 private final LookupStrategy lookupStrategy ;
67- private final @ Nullable DeletionVectorsMaintainer deletionVectorsMaintainer ;
67+ private final @ Nullable DeletionVectorsMaintainer dvMaintainer ;
6868 private final Comparator <KeyValue > comparator ;
6969
70- private final LinkedList <KeyValue > candidates = new LinkedList <>();
71-
7270 public LookupChangelogMergeFunctionWrapper (
7371 MergeFunctionFactory <KeyValue > mergeFunctionFactory ,
7472 Function <InternalRow , T > lookup ,
7573 @ Nullable RecordEqualiser valueEqualiser ,
7674 LookupStrategy lookupStrategy ,
77- @ Nullable DeletionVectorsMaintainer deletionVectorsMaintainer ,
75+ @ Nullable DeletionVectorsMaintainer dvMaintainer ,
7876 @ Nullable UserDefinedSeqComparator userDefinedSeqComparator ) {
77+ if (lookupStrategy .deletionVector ) {
78+ checkArgument (
79+ dvMaintainer != null ,
80+ "deletionVectorsMaintainer should not be null, there is a bug." );
81+ }
7982 MergeFunction <KeyValue > mergeFunction = mergeFunctionFactory .create ();
8083 checkArgument (
8184 mergeFunction instanceof LookupMergeFunction ,
8285 "Merge function should be a LookupMergeFunction, but is %s, there is a bug." ,
8386 mergeFunction .getClass ().getName ());
84- if (lookupStrategy .deletionVector ) {
85- checkArgument (
86- deletionVectorsMaintainer != null ,
87- "deletionVectorsMaintainer should not be null, there is a bug." );
88- }
89- this .mergeFunction = mergeFunctionFactory .create ();
87+ this .mergeFunction = (LookupMergeFunction ) mergeFunction ;
9088 this .lookup = lookup ;
9189 this .valueEqualiser = valueEqualiser ;
9290 this .lookupStrategy = lookupStrategy ;
93- this .deletionVectorsMaintainer = deletionVectorsMaintainer ;
91+ this .dvMaintainer = dvMaintainer ;
9492 this .comparator = createSequenceComparator (userDefinedSeqComparator );
9593 }
9694
9795 @ Override
9896 public void reset () {
99- candidates . clear ();
97+ mergeFunction . reset ();
10098 }
10199
102100 @ Override
103101 public void add (KeyValue kv ) {
104- candidates .add (kv );
102+ mergeFunction .add (kv );
105103 }
106104
107105 @ Override
108106 public ChangelogResult getResult () {
109- // 1. Compute the latest high level record and containLevel0 of candidates
110- Iterator <KeyValue > descending = candidates .descendingIterator ();
111- KeyValue highLevel = null ;
112- boolean containLevel0 = false ;
113- while (descending .hasNext ()) {
114- KeyValue kv = descending .next ();
115- if (kv .level () > 0 ) {
116- descending .remove ();
117- if (highLevel == null || kv .level () < highLevel .level ()) {
118- highLevel = kv ;
119- }
120- } else {
121- containLevel0 = true ;
122- }
123- }
107+ // 1. Find the latest high level record and compute containLevel0
108+ KeyValue highLevel = mergeFunction .pickHighLevel ();
109+ boolean containLevel0 = mergeFunction .containLevel0 ();
124110
125111 // 2. Lookup if latest high level record is absent
126112 if (highLevel == null ) {
127- InternalRow lookupKey = candidates .get (0 ).key ();
128- T lookupResult = lookup .apply (lookupKey );
113+ T lookupResult = lookup .apply (mergeFunction .key ());
129114 if (lookupResult != null ) {
130115 if (lookupStrategy .deletionVector ) {
131116 PositionedKeyValue positionedKeyValue = (PositionedKeyValue ) lookupResult ;
132117 highLevel = positionedKeyValue .keyValue ();
133- deletionVectorsMaintainer .notifyNewDeletion (
118+ dvMaintainer .notifyNewDeletion (
134119 positionedKeyValue .fileName (), positionedKeyValue .rowPosition ());
135120 } else {
136121 highLevel = (KeyValue ) lookupResult ;
137122 }
138123 }
124+ if (highLevel != null ) {
125+ insertInto (mergeFunction .candidates (), highLevel );
126+ }
139127 }
140128
141129 // 3. Calculate result
142- KeyValue result = calculateResult ( candidates , highLevel );
130+ KeyValue result = mergeFunction . getResult ( );
143131
144132 // 4. Set changelog when there's level-0 records
145133 reusedResult .reset ();
@@ -150,21 +138,22 @@ public ChangelogResult getResult() {
150138 return reusedResult .setResult (result );
151139 }
152140
153- private KeyValue calculateResult ( List <KeyValue > candidates , @ Nullable KeyValue highLevel ) {
154- mergeFunction . reset ();
141+ private void insertInto ( LinkedList <KeyValue > candidates , KeyValue highLevel ) {
142+ List < KeyValue > newCandidates = new ArrayList <> ();
155143 for (KeyValue candidate : candidates ) {
156144 if (highLevel != null && comparator .compare (highLevel , candidate ) < 0 ) {
157- mergeFunction .add (highLevel );
158- mergeFunction .add (candidate );
145+ newCandidates .add (highLevel );
146+ newCandidates .add (candidate );
159147 highLevel = null ;
160148 } else {
161- mergeFunction .add (candidate );
149+ newCandidates .add (candidate );
162150 }
163151 }
164152 if (highLevel != null ) {
165- mergeFunction .add (highLevel );
153+ newCandidates .add (highLevel );
166154 }
167- return mergeFunction .getResult ();
155+ candidates .clear ();
156+ candidates .addAll (newCandidates );
168157 }
169158
170159 private void setChangelog (@ Nullable KeyValue before , KeyValue after ) {
@@ -197,16 +186,21 @@ private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
197186 }
198187
199188 private Comparator <KeyValue > createSequenceComparator (
200- @ Nullable FieldsComparator userDefinedSeqComparator ) {
201- if (userDefinedSeqComparator == null ) {
202- return Comparator .comparingLong (KeyValue ::sequenceNumber );
203- }
204-
189+ @ Nullable FieldsComparator udsComparator ) {
205190 return (o1 , o2 ) -> {
206- int result = userDefinedSeqComparator .compare (o1 .value (), o2 .value ());
207- if (result != 0 ) {
208- return result ;
191+ // For high-level comparison logic (not involving Level 0), only the value of the
192+ // minimum Level should be selected
193+ if (o1 .level () > 0 && o2 .level () > 0 && o1 .level () != o2 .level ()) {
194+ return Integer .compare (o2 .level (), o1 .level ());
195+ }
196+
197+ if (udsComparator != null ) {
198+ int result = udsComparator .compare (o1 .value (), o2 .value ());
199+ if (result != 0 ) {
200+ return result ;
201+ }
209202 }
203+
210204 return Long .compare (o1 .sequenceNumber (), o2 .sequenceNumber ());
211205 };
212206 }
0 commit comments