17
17
*/
18
18
package org .apache .beam .sdk .transforms .reflect ;
19
19
20
+ import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
20
21
import static org .apache .beam .sdk .util .common .ReflectHelpers .findClassLoader ;
21
22
import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkArgument ;
22
23
107
108
import org .apache .beam .sdk .values .TypeDescriptors ;
108
109
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Maps ;
109
110
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .primitives .Primitives ;
111
+ import org .checkerframework .checker .nullness .qual .MonotonicNonNull ;
112
+ import org .checkerframework .checker .nullness .qual .NonNull ;
110
113
import org .checkerframework .checker .nullness .qual .Nullable ;
111
114
import org .joda .time .Instant ;
112
115
113
116
/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
114
- @ SuppressWarnings ({
115
- "nullness" , // TODO(https://github.com/apache/beam/issues/20497)
116
- "rawtypes"
117
- })
117
+ @ SuppressWarnings ({"rawtypes" })
118
118
class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
119
119
120
120
public static final String SETUP_CONTEXT_PARAMETER_METHOD = "setupContext" ;
@@ -271,14 +271,20 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
271
271
(DoFnInvokerBase <InputT , OutputT , DoFn <InputT , OutputT >>)
272
272
getByteBuddyInvokerConstructor (signature ).newInstance (fn );
273
273
274
- for (OnTimerMethod onTimerMethod : signature .onTimerMethods ().values ()) {
275
- invoker .addOnTimerInvoker (
276
- onTimerMethod .id (), OnTimerInvokers .forTimer (fn , onTimerMethod .id ()));
274
+ if (signature .onTimerMethods () != null ) {
275
+ for (OnTimerMethod onTimerMethod : signature .onTimerMethods ().values ()) {
276
+ invoker .addOnTimerInvoker (
277
+ onTimerMethod .id (), OnTimerInvokers .forTimer (fn , onTimerMethod .id ()));
278
+ }
277
279
}
278
- for (DoFnSignature .OnTimerFamilyMethod onTimerFamilyMethod :
279
- signature .onTimerFamilyMethods ().values ()) {
280
- invoker .addOnTimerFamilyInvoker (
281
- onTimerFamilyMethod .id (), OnTimerInvokers .forTimerFamily (fn , onTimerFamilyMethod .id ()));
280
+
281
+ if (signature .onTimerFamilyMethods () != null ) {
282
+ for (DoFnSignature .OnTimerFamilyMethod onTimerFamilyMethod :
283
+ signature .onTimerFamilyMethods ().values ()) {
284
+ invoker .addOnTimerFamilyInvoker (
285
+ onTimerFamilyMethod .id (),
286
+ OnTimerInvokers .forTimerFamily (fn , onTimerFamilyMethod .id ()));
287
+ }
282
288
}
283
289
return invoker ;
284
290
} catch (InstantiationException
@@ -313,7 +319,10 @@ private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
313
319
/** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
314
320
public static class DefaultSplitRestriction {
315
321
/** Doesn't split the restriction. */
316
- @ SuppressWarnings ("unused" )
322
+ @ SuppressWarnings ({
323
+ "unused" ,
324
+ "nullness" // TODO: DoFn parameter is actually never used by outputReceiver
325
+ })
317
326
public static void invokeSplitRestriction (DoFnInvoker .ArgumentProvider argumentProvider ) {
318
327
argumentProvider .outputReceiver (null ).output (argumentProvider .restriction ());
319
328
}
@@ -324,7 +333,7 @@ public static class DefaultTruncateRestriction {
324
333
325
334
/** Output the current restriction if it is bounded. Otherwise, return null. */
326
335
@ SuppressWarnings ("unused" )
327
- public static TruncateResult <?> invokeTruncateRestriction (
336
+ public static @ Nullable TruncateResult <?> invokeTruncateRestriction (
328
337
DoFnInvoker .ArgumentProvider argumentProvider ) {
329
338
if (argumentProvider .restrictionTracker ().isBounded () == IsBounded .BOUNDED ) {
330
339
return TruncateResult .of (argumentProvider .restriction ());
@@ -374,7 +383,7 @@ Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
374
383
public static class DefaultGetInitialWatermarkEstimatorState {
375
384
/** The default watermark estimator state is {@code null}. */
376
385
@ SuppressWarnings ("unused" )
377
- public static <InputT , OutputT , WatermarkEstimatorStateT >
386
+ public static <InputT , OutputT , WatermarkEstimatorStateT > @ Nullable
378
387
WatermarkEstimator <WatermarkEstimatorStateT > invokeNewWatermarkEstimator (
379
388
DoFnInvoker .ArgumentProvider <InputT , OutputT > argumentProvider ) {
380
389
return null ;
@@ -404,6 +413,8 @@ public Instant currentWatermark() {
404
413
}
405
414
406
415
@ Override
416
+ @ SuppressWarnings ("nullness" ) // WatermarkEstimatorStateT may or may not allow nullness
417
+ // if it does not, then this method must be overridden in the DoFn
407
418
public WatermarkEstimatorStateT getState () {
408
419
return null ;
409
420
}
@@ -529,14 +540,27 @@ private static ClassLoadingStrategy<ClassLoader> getClassLoadingStrategy(Class<?
529
540
try {
530
541
ClassLoadingStrategy <ClassLoader > strategy ;
531
542
if (ClassInjector .UsingLookup .isAvailable ()) {
543
+ @ SuppressWarnings (
544
+ "nullness" ) // Method.invoke not annotated to allow null receiver, so we lie
545
+ @ NonNull
546
+ Object nullObjectForStaticMethodInvocation = null ;
547
+
532
548
Class <?> methodHandles = Class .forName ("java.lang.invoke.MethodHandles" );
533
- Object lookup = methodHandles .getMethod ("lookup" ).invoke (null );
549
+ Object lookup =
550
+ checkStateNotNull (
551
+ methodHandles .getMethod ("lookup" ).invoke (nullObjectForStaticMethodInvocation ),
552
+ "'MethodHandles.lookup' declared available, but lookup returned null" );
553
+
534
554
Method privateLookupIn =
535
555
methodHandles .getMethod (
536
556
"privateLookupIn" ,
537
557
Class .class ,
538
558
Class .forName ("java.lang.invoke.MethodHandles$Lookup" ));
539
- Object privateLookup = privateLookupIn .invoke (null , targetClass , lookup );
559
+
560
+ Object privateLookup =
561
+ checkStateNotNull (
562
+ privateLookupIn .invoke (nullObjectForStaticMethodInvocation , targetClass , lookup ),
563
+ "MethodHandles.Lookup.privateLookupIn not available" );
540
564
strategy = ClassLoadingStrategy .UsingLookup .of (privateLookup );
541
565
} else if (ClassInjector .UsingReflection .isAvailable ()) {
542
566
strategy = ClassLoadingStrategy .Default .INJECTION ;
@@ -553,6 +577,10 @@ private static Implementation getRestrictionCoderDelegation(
553
577
TypeDescription doFnType , DoFnSignature signature ) {
554
578
if (signature .processElement ().isSplittable ()) {
555
579
if (signature .getRestrictionCoder () == null ) {
580
+ checkStateNotNull (
581
+ signature .getInitialRestriction (),
582
+ "@GetRestrictionCoder provided but @GetInitialRestriction not provided:"
583
+ + " please add @GetInitialRestriction" );
556
584
return MethodDelegation .to (
557
585
new DefaultRestrictionCoder (signature .getInitialRestriction ().restrictionT ()));
558
586
} else {
@@ -583,7 +611,7 @@ private static Implementation getWatermarkEstimatorStateCoderDelegation(
583
611
}
584
612
585
613
private static Implementation splitRestrictionDelegation (
586
- TypeDescription doFnType , DoFnSignature .SplitRestrictionMethod signature ) {
614
+ TypeDescription doFnType , DoFnSignature .@ Nullable SplitRestrictionMethod signature ) {
587
615
if (signature == null ) {
588
616
return MethodDelegation .to (DefaultSplitRestriction .class );
589
617
} else {
@@ -592,7 +620,7 @@ private static Implementation splitRestrictionDelegation(
592
620
}
593
621
594
622
private static Implementation truncateRestrictionDelegation (
595
- TypeDescription doFnType , DoFnSignature .TruncateRestrictionMethod signature ) {
623
+ TypeDescription doFnType , DoFnSignature .@ Nullable TruncateRestrictionMethod signature ) {
596
624
if (signature == null ) {
597
625
return MethodDelegation .to (DefaultTruncateRestriction .class );
598
626
} else {
@@ -641,22 +669,22 @@ private static Implementation getSizeDelegation(
641
669
642
670
/** Delegates to the given method if available, or does nothing. */
643
671
private static Implementation delegateOrNoop (
644
- TypeDescription doFnType , DoFnSignature .DoFnMethod method ) {
672
+ TypeDescription doFnType , DoFnSignature .@ Nullable DoFnMethod method ) {
645
673
return (method == null )
646
674
? FixedValue .originType ()
647
675
: new DoFnMethodDelegation (doFnType , method .targetMethod ());
648
676
}
649
677
650
678
/** Delegates method with extra parameters to the given method if available, or does nothing. */
651
679
private static Implementation delegateMethodWithExtraParametersOrNoop (
652
- TypeDescription doFnType , DoFnSignature .MethodWithExtraParameters method ) {
680
+ TypeDescription doFnType , DoFnSignature .@ Nullable MethodWithExtraParameters method ) {
653
681
return (method == null )
654
682
? FixedValue .originType ()
655
683
: new DoFnMethodWithExtraParametersDelegation (doFnType , method );
656
684
}
657
685
658
686
private static Implementation delegateMethodWithExtraParametersOrThrow (
659
- TypeDescription doFnType , DoFnSignature .MethodWithExtraParameters method ) {
687
+ TypeDescription doFnType , DoFnSignature .@ Nullable MethodWithExtraParameters method ) {
660
688
return (method == null )
661
689
? ExceptionMethod .throwing (UnsupportedOperationException .class )
662
690
: new DoFnMethodWithExtraParametersDelegation (doFnType , method );
@@ -673,7 +701,7 @@ static class DoFnMethodDelegation implements Implementation {
673
701
private final boolean targetHasReturn ;
674
702
675
703
/** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
676
- protected @ Nullable FieldDescription delegateField ;
704
+ protected @ MonotonicNonNull FieldDescription delegateField ;
677
705
678
706
private final TypeDescription doFnType ;
679
707
@@ -686,10 +714,14 @@ public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {
686
714
687
715
@ Override
688
716
public InstrumentedType prepare (InstrumentedType instrumentedType ) {
717
+ TypeDescription .Generic superclassDoFnInvokerBase =
718
+ checkStateNotNull (
719
+ instrumentedType .getSuperClass (), // always DoFnInvokerBase
720
+ "internal error: expected superclass to be DoFnInvokerBase, but was null" );
721
+
689
722
// Remember the field description of the instrumented type.
690
723
delegateField =
691
- instrumentedType
692
- .getSuperClass () // always DoFnInvokerBase
724
+ superclassDoFnInvokerBase
693
725
.getDeclaredFields ()
694
726
.filter (ElementMatchers .named (FN_DELEGATE_FIELD_NAME ))
695
727
.getOnly ();
@@ -728,6 +760,9 @@ public ByteCodeAppender.Size apply(
728
760
}
729
761
}
730
762
763
+ checkStateNotNull (
764
+ delegateField , "delegateField is null - must call prepare(...) before delegation" );
765
+
731
766
StackManipulation manipulation =
732
767
new StackManipulation .Compound (
733
768
// Push "this" (DoFnInvoker on top of the stack)
@@ -799,6 +834,9 @@ public DoFnMethodWithExtraParametersDelegation(
799
834
800
835
@ Override
801
836
protected StackManipulation beforeDelegation (MethodDescription instrumentedMethod ) {
837
+ checkStateNotNull (
838
+ delegateField , "delegateField is null - must call prepare(...) before delegation" );
839
+
802
840
// Parameters of the wrapper invoker method:
803
841
// DoFn.ArgumentProvider
804
842
// Parameters of the wrapped DoFn method:
@@ -1249,7 +1287,11 @@ private void visitFrame(
1249
1287
1250
1288
Type [] localTypes = Type .getArgumentTypes (instrumentedMethod .getDescriptor ());
1251
1289
Object [] locals = new Object [1 + localTypes .length + (hasReturnLocal ? 1 : 0 )];
1252
- locals [0 ] = instrumentedMethod .getReceiverType ().asErasure ().getInternalName ();
1290
+ TypeDescription .Generic receiverType =
1291
+ checkStateNotNull (
1292
+ instrumentedMethod .getReceiverType (),
1293
+ "invalid static method used as annotated DoFn method" );
1294
+ locals [0 ] = receiverType .asErasure ().getInternalName ();
1253
1295
for (int i = 0 ; i < localTypes .length ; i ++) {
1254
1296
locals [i + 1 ] = describeType (localTypes [i ]);
1255
1297
}
@@ -1308,8 +1350,11 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) {
1308
1350
mv .visitLabel (wrapEnd );
1309
1351
if (returnVarIndex != null ) {
1310
1352
// Drop the return type from the locals
1353
+ @ SuppressWarnings ("nullness" ) // bytebuddy MethodVisitor not annotated, so we lie
1354
+ @ NonNull
1355
+ String nullSignature = null ;
1311
1356
mv .visitLocalVariable (
1312
- "res" , returnType .getDescriptor (), null , wrapStart , wrapEnd , returnVarIndex );
1357
+ "res" , returnType .getDescriptor (), nullSignature , wrapStart , wrapEnd , returnVarIndex );
1313
1358
}
1314
1359
1315
1360
return size ;
0 commit comments