@@ -70,7 +70,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
7070 private final Function <Long , BloomFilter .Builder > bfGenerator ;
7171 private final Cache <String , LookupFile > lookupFileCache ;
7272 private final Set <String > ownCachedFiles ;
73- private final Map <Pair <Long , String >, PersistProcessor <T >> schemaIdAndSerIdToProcessors ;
73+ private final Map <Pair <Long , String >, PersistProcessor <T >> schemaIdAndSerVersionToProcessors ;
7474
7575 @ Nullable private RemoteFileDownloader remoteFileDownloader ;
7676
@@ -100,7 +100,7 @@ public LookupLevels(
100100 this .bfGenerator = bfGenerator ;
101101 this .lookupFileCache = lookupFileCache ;
102102 this .ownCachedFiles = new HashSet <>();
103- this .schemaIdAndSerIdToProcessors = new ConcurrentHashMap <>();
103+ this .schemaIdAndSerVersionToProcessors = new ConcurrentHashMap <>();
104104 levels .addDropFileCallback (this );
105105 }
106106
@@ -165,17 +165,17 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
165165 return null ;
166166 }
167167
168- return getOrCreateProcessor (lookupFile .schemaId (), lookupFile .serializerId ())
168+ return getOrCreateProcessor (lookupFile .schemaId (), lookupFile .serVersion ())
169169 .readFromDisk (key , lookupFile .level (), valueBytes , file .fileName ());
170170 }
171171
172- private PersistProcessor <T > getOrCreateProcessor (long schemaId , String serializerId ) {
173- return schemaIdAndSerIdToProcessors .computeIfAbsent (
174- Pair .of (schemaId , serializerId ),
172+ private PersistProcessor <T > getOrCreateProcessor (long schemaId , String serVersion ) {
173+ return schemaIdAndSerVersionToProcessors .computeIfAbsent (
174+ Pair .of (schemaId , serVersion ),
175175 id -> {
176176 RowType fileSchema =
177177 schemaId == currentSchemaId ? null : schemaFunction .apply (schemaId );
178- return processorFactory .create (serializerId , serializerFactory , fileSchema );
178+ return processorFactory .create (serVersion , serializerFactory , fileSchema );
179179 });
180180 }
181181
@@ -186,12 +186,12 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
186186 }
187187
188188 long schemaId = this .currentSchemaId ;
189- String fileSerId = serializerFactory .identifier ();
190- Optional <String > downloadSerId = tryToDownloadRemoteSst (file , localFile );
191- if (downloadSerId .isPresent ()) {
189+ String fileSerVersion = serializerFactory .version ();
190+ Optional <String > downloadSerVersion = tryToDownloadRemoteSst (file , localFile );
191+ if (downloadSerVersion .isPresent ()) {
192192 // use schema id from remote file
193193 schemaId = file .schemaId ();
194- fileSerId = downloadSerId .get ();
194+ fileSerVersion = downloadSerVersion .get ();
195195 } else {
196196 createSstFileFromDataFile (file , localFile );
197197 }
@@ -201,7 +201,7 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
201201 localFile ,
202202 file .level (),
203203 schemaId ,
204- fileSerId ,
204+ fileSerVersion ,
205205 lookupStoreFactory .createReader (localFile ),
206206 () -> ownCachedFiles .remove (file .fileName ()));
207207 }
@@ -219,7 +219,7 @@ private Optional<String> tryToDownloadRemoteSst(DataFileMeta file, File localFil
219219
220220 // validate schema matched, no exception here
221221 try {
222- getOrCreateProcessor (file .schemaId (), remoteSst .serializerId );
222+ getOrCreateProcessor (file .schemaId (), remoteSst .serVersion );
223223 } catch (UnsupportedOperationException e ) {
224224 return Optional .empty ();
225225 }
@@ -229,7 +229,7 @@ private Optional<String> tryToDownloadRemoteSst(DataFileMeta file, File localFil
229229 return Optional .empty ();
230230 }
231231
232- return Optional .of (remoteSst .serializerId );
232+ return Optional .of (remoteSst .serVersion );
233233 }
234234
235235 public void addLocalFile (DataFileMeta file , LookupFile lookupFile ) {
@@ -242,7 +242,7 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
242242 localFile , bfGenerator .apply (file .rowCount ()));
243243 RecordReader <KeyValue > reader = fileReaderFactory .apply (file )) {
244244 PersistProcessor <T > processor =
245- getOrCreateProcessor (currentSchemaId , serializerFactory .identifier ());
245+ getOrCreateProcessor (currentSchemaId , serializerFactory .version ());
246246 KeyValue kv ;
247247 if (processor .withPosition ()) {
248248 FileRecordIterator <KeyValue > batch ;
@@ -291,8 +291,8 @@ public Optional<RemoteSstFile> remoteSst(DataFileMeta file) {
291291 return Optional .empty ();
292292 }
293293
294- String serializerId = split [split .length - 2 ];
295- return Optional .of (new RemoteSstFile (sstFileName , serializerId ));
294+ String serVersion = split [split .length - 2 ];
295+ return Optional .of (new RemoteSstFile (sstFileName , serVersion ));
296296 }
297297
298298 public String newRemoteSst (DataFileMeta file , long length ) {
@@ -302,7 +302,7 @@ public String newRemoteSst(DataFileMeta file, long length) {
302302 + "."
303303 + processorFactory .identifier ()
304304 + "."
305- + serializerFactory .identifier ()
305+ + serializerFactory .version ()
306306 + REMOTE_LOOKUP_FILE_SUFFIX ;
307307 }
308308
@@ -314,15 +314,15 @@ public void close() throws IOException {
314314 }
315315 }
316316
317- /** Remote sst file with serializerId . */
317+ /** Remote sst file with serVersion . */
318318 public static class RemoteSstFile {
319319
320320 private final String sstFileName ;
321- private final String serializerId ;
321+ private final String serVersion ;
322322
323- private RemoteSstFile (String sstFileName , String serializerId ) {
323+ private RemoteSstFile (String sstFileName , String serVersion ) {
324324 this .sstFileName = sstFileName ;
325- this .serializerId = serializerId ;
325+ this .serVersion = serVersion ;
326326 }
327327 }
328328}
0 commit comments