Skip to content

Commit ae45832

Browse files
X-czhhuwh
authored andcommitted
[FLINK-37722] Eliminate redundant field initialization of PojoSerializer
1 parent d30cf4d commit ae45832

File tree

1 file changed

+42
-53
lines changed

1 file changed

+42
-53
lines changed

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java

Lines changed: 42 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,7 @@ public T copy(T from) {
272272
"Error during POJO copy, this should not happen since we check the fields before.");
273273
}
274274
} else if (actualType == clazz) {
275-
T target;
276-
try {
277-
target = instantiateRaw();
278-
} catch (Throwable t) {
279-
throw new RuntimeException("Cannot instantiate class.", t);
280-
}
275+
T target = instantiateRaw();
281276
// no subclass
282277
try {
283278
for (int i = 0; i < numFields; i++) {
@@ -442,27 +437,17 @@ public T deserialize(DataInputView source) throws IOException {
442437
return null;
443438
}
444439

445-
T target;
446-
447-
Class<?> actualSubclass = null;
448-
TypeSerializer subclassSerializer = null;
449-
450440
if ((flags & IS_SUBCLASS) != 0) {
451441
String subclassName = source.readUTF();
452-
actualSubclass = getSubclassByName(subclassName);
453-
subclassSerializer = getSubclassSerializer(actualSubclass);
454-
target = (T) subclassSerializer.createInstance();
455-
// also initialize fields for which the subclass serializer is not responsible
456-
initializeFields(target);
457-
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
442+
Class<?> actualSubclass = getSubclassByName(subclassName);
443+
TypeSerializer subclassSerializer = getSubclassSerializer(actualSubclass);
444+
return (T) subclassSerializer.deserialize(source);
445+
}
458446

447+
if ((flags & IS_TAGGED_SUBCLASS) != 0) {
459448
int subclassTag = source.readByte();
460-
subclassSerializer = registeredSerializers[subclassTag];
461-
target = (T) subclassSerializer.createInstance();
462-
// also initialize fields for which the subclass serializer is not responsible
463-
initializeFields(target);
464-
} else {
465-
target = createInstance();
449+
TypeSerializer subclassSerializer = registeredSerializers[subclassTag];
450+
return (T) subclassSerializer.deserialize(source);
466451
}
467452

468453
if (isRecord()) {
@@ -474,8 +459,11 @@ public T deserialize(DataInputView source) throws IOException {
474459
builder.setField(i, fieldValue);
475460
}
476461
}
477-
target = builder.build();
478-
} else if ((flags & NO_SUBCLASS) != 0) {
462+
return builder.build();
463+
}
464+
465+
if ((flags & NO_SUBCLASS) != 0) {
466+
T target = instantiateRaw();
479467
try {
480468
for (int i = 0; i < numFields; i++) {
481469
boolean isNull = source.readBoolean();
@@ -489,12 +477,10 @@ public T deserialize(DataInputView source) throws IOException {
489477
"Error during POJO copy, this should not happen since we check the fields before.",
490478
e);
491479
}
492-
} else {
493-
if (subclassSerializer != null) {
494-
target = (T) subclassSerializer.deserialize(target, source);
495-
}
480+
return target;
496481
}
497-
return target;
482+
483+
throw new RuntimeException("Unknown POJO flags, this should not happen.");
498484
}
499485

500486
@Override
@@ -507,36 +493,36 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
507493
return null;
508494
}
509495

510-
Class<?> subclass = null;
511-
TypeSerializer subclassSerializer = null;
512496
if ((flags & IS_SUBCLASS) != 0) {
513497
String subclassName = source.readUTF();
514-
subclass = getSubclassByName(subclassName);
515-
subclassSerializer = getSubclassSerializer(subclass);
498+
Class<?> subclass = getSubclassByName(subclassName);
499+
TypeSerializer subclassSerializer = getSubclassSerializer(subclass);
516500

517501
if (reuse == null || subclass != reuse.getClass()) {
518502
// cannot reuse
519-
reuse = (T) subclassSerializer.createInstance();
520-
// also initialize fields for which the subclass serializer is not responsible
521-
initializeFields(reuse);
503+
return (T) subclassSerializer.deserialize(source);
504+
} else {
505+
return (T) subclassSerializer.deserialize(reuse, source);
522506
}
523-
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
507+
}
508+
509+
if ((flags & IS_TAGGED_SUBCLASS) != 0) {
524510
int subclassTag = source.readByte();
525-
subclassSerializer = registeredSerializers[subclassTag];
511+
TypeSerializer subclassSerializer = registeredSerializers[subclassTag];
526512

527513
if (reuse == null || ((PojoSerializer) subclassSerializer).clazz != reuse.getClass()) {
528514
// cannot reuse
529-
reuse = (T) subclassSerializer.createInstance();
530-
// also initialize fields for which the subclass serializer is not responsible
531-
initializeFields(reuse);
532-
}
533-
} else {
534-
if (reuse == null || clazz != reuse.getClass()) {
535-
reuse = createInstance();
515+
return (T) subclassSerializer.deserialize(source);
516+
} else {
517+
return (T) subclassSerializer.deserialize(reuse, source);
536518
}
537519
}
538520

539521
if (isRecord()) {
522+
if (reuse != null && clazz != reuse.getClass()) {
523+
// cannot reuse, and cannot directly instantiate a record either
524+
reuse = null;
525+
}
540526
try {
541527
JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder();
542528
for (int i = 0; i < numFields; i++) {
@@ -555,13 +541,19 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
555541
}
556542
}
557543

558-
reuse = builder.build();
544+
return builder.build();
559545
} catch (IllegalAccessException e) {
560546
throw new RuntimeException(
561547
"Error during POJO copy, this should not happen since we check the fields before.",
562548
e);
563549
}
564-
} else if ((flags & NO_SUBCLASS) != 0) {
550+
}
551+
552+
if ((flags & NO_SUBCLASS) != 0) {
553+
if (reuse == null || clazz != reuse.getClass()) {
554+
// cannot reuse
555+
reuse = instantiateRaw();
556+
}
565557
try {
566558
for (int i = 0; i < numFields; i++) {
567559
boolean isNull = source.readBoolean();
@@ -582,13 +574,10 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
582574
"Error during POJO copy, this should not happen since we check the fields before.",
583575
e);
584576
}
585-
} else {
586-
if (subclassSerializer != null) {
587-
reuse = (T) subclassSerializer.deserialize(reuse, source);
588-
}
577+
return reuse;
589578
}
590579

591-
return reuse;
580+
throw new RuntimeException("Unknown POJO flags, this should not happen.");
592581
}
593582

594583
private Object deserializeField(Object reuseField, int i, DataInputView source)

0 commit comments

Comments
 (0)