1717 */
1818package org .apache .beam .sdk .transforms .reflect ;
1919
20+ import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
2021import static org .apache .beam .sdk .util .common .ReflectHelpers .findClassLoader ;
2122import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkArgument ;
2223
107108import org .apache .beam .sdk .values .TypeDescriptors ;
108109import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Maps ;
109110import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .primitives .Primitives ;
111+ import org .checkerframework .checker .nullness .qual .MonotonicNonNull ;
112+ import org .checkerframework .checker .nullness .qual .NonNull ;
110113import org .checkerframework .checker .nullness .qual .Nullable ;
111114import org .joda .time .Instant ;
112115
113116/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
114- @ SuppressWarnings ({
115- "nullness" , // TODO(https://github.com/apache/beam/issues/20497)
116- "rawtypes"
117- })
117+ @ SuppressWarnings ({"rawtypes" })
118118class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
119119
120120 public static final String SETUP_CONTEXT_PARAMETER_METHOD = "setupContext" ;
@@ -271,14 +271,20 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
271271 (DoFnInvokerBase <InputT , OutputT , DoFn <InputT , OutputT >>)
272272 getByteBuddyInvokerConstructor (signature ).newInstance (fn );
273273
274- for (OnTimerMethod onTimerMethod : signature .onTimerMethods ().values ()) {
275- invoker .addOnTimerInvoker (
276- onTimerMethod .id (), OnTimerInvokers .forTimer (fn , onTimerMethod .id ()));
274+ if (signature .onTimerMethods () != null ) {
275+ for (OnTimerMethod onTimerMethod : signature .onTimerMethods ().values ()) {
276+ invoker .addOnTimerInvoker (
277+ onTimerMethod .id (), OnTimerInvokers .forTimer (fn , onTimerMethod .id ()));
278+ }
277279 }
278- for (DoFnSignature .OnTimerFamilyMethod onTimerFamilyMethod :
279- signature .onTimerFamilyMethods ().values ()) {
280- invoker .addOnTimerFamilyInvoker (
281- onTimerFamilyMethod .id (), OnTimerInvokers .forTimerFamily (fn , onTimerFamilyMethod .id ()));
280+
281+ if (signature .onTimerFamilyMethods () != null ) {
282+ for (DoFnSignature .OnTimerFamilyMethod onTimerFamilyMethod :
283+ signature .onTimerFamilyMethods ().values ()) {
284+ invoker .addOnTimerFamilyInvoker (
285+ onTimerFamilyMethod .id (),
286+ OnTimerInvokers .forTimerFamily (fn , onTimerFamilyMethod .id ()));
287+ }
282288 }
283289 return invoker ;
284290 } catch (InstantiationException
@@ -313,7 +319,10 @@ private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
313319 /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
314320 public static class DefaultSplitRestriction {
315321 /** Doesn't split the restriction. */
316- @ SuppressWarnings ("unused" )
322+ @ SuppressWarnings ({
323+ "unused" ,
324+ "nullness" // TODO: DoFn parameter is actually never used by outputReceiver
325+ })
317326 public static void invokeSplitRestriction (DoFnInvoker .ArgumentProvider argumentProvider ) {
318327 argumentProvider .outputReceiver (null ).output (argumentProvider .restriction ());
319328 }
@@ -324,7 +333,7 @@ public static class DefaultTruncateRestriction {
324333
325334 /** Output the current restriction if it is bounded. Otherwise, return null. */
326335 @ SuppressWarnings ("unused" )
327- public static TruncateResult <?> invokeTruncateRestriction (
336+ public static @ Nullable TruncateResult <?> invokeTruncateRestriction (
328337 DoFnInvoker .ArgumentProvider argumentProvider ) {
329338 if (argumentProvider .restrictionTracker ().isBounded () == IsBounded .BOUNDED ) {
330339 return TruncateResult .of (argumentProvider .restriction ());
@@ -374,7 +383,7 @@ Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
374383 public static class DefaultGetInitialWatermarkEstimatorState {
375384 /** The default watermark estimator state is {@code null}. */
376385 @ SuppressWarnings ("unused" )
377- public static <InputT , OutputT , WatermarkEstimatorStateT >
386+ public static <InputT , OutputT , WatermarkEstimatorStateT > @ Nullable
378387 WatermarkEstimator <WatermarkEstimatorStateT > invokeNewWatermarkEstimator (
379388 DoFnInvoker .ArgumentProvider <InputT , OutputT > argumentProvider ) {
380389 return null ;
@@ -404,6 +413,8 @@ public Instant currentWatermark() {
404413 }
405414
406415 @ Override
416+ @ SuppressWarnings ("nullness" ) // WatermarkEstimatorStateT may or may not allow nullness
417+ // if it does not, then this method must be overridden in the DoFn
407418 public WatermarkEstimatorStateT getState () {
408419 return null ;
409420 }
@@ -529,14 +540,27 @@ private static ClassLoadingStrategy<ClassLoader> getClassLoadingStrategy(Class<?
529540 try {
530541 ClassLoadingStrategy <ClassLoader > strategy ;
531542 if (ClassInjector .UsingLookup .isAvailable ()) {
543+ @ SuppressWarnings (
544+ "nullness" ) // Method.invoke not annotated to allow null receiver, so we lie
545+ @ NonNull
546+ Object nullObjectForStaticMethodInvocation = null ;
547+
532548 Class <?> methodHandles = Class .forName ("java.lang.invoke.MethodHandles" );
533- Object lookup = methodHandles .getMethod ("lookup" ).invoke (null );
549+ Object lookup =
550+ checkStateNotNull (
551+ methodHandles .getMethod ("lookup" ).invoke (nullObjectForStaticMethodInvocation ),
552+ "'MethodHandles.lookup' declared available, but lookup returned null" );
553+
534554 Method privateLookupIn =
535555 methodHandles .getMethod (
536556 "privateLookupIn" ,
537557 Class .class ,
538558 Class .forName ("java.lang.invoke.MethodHandles$Lookup" ));
539- Object privateLookup = privateLookupIn .invoke (null , targetClass , lookup );
559+
560+ Object privateLookup =
561+ checkStateNotNull (
562+ privateLookupIn .invoke (nullObjectForStaticMethodInvocation , targetClass , lookup ),
563+ "MethodHandles.Lookup.privateLookupIn not available" );
540564 strategy = ClassLoadingStrategy .UsingLookup .of (privateLookup );
541565 } else if (ClassInjector .UsingReflection .isAvailable ()) {
542566 strategy = ClassLoadingStrategy .Default .INJECTION ;
@@ -553,6 +577,10 @@ private static Implementation getRestrictionCoderDelegation(
553577 TypeDescription doFnType , DoFnSignature signature ) {
554578 if (signature .processElement ().isSplittable ()) {
555579 if (signature .getRestrictionCoder () == null ) {
580+ checkStateNotNull (
581+ signature .getInitialRestriction (),
582+ "@GetRestrictionCoder provided but @GetInitialRestriction not provided:"
583+ + " please add @GetInitialRestriction" );
556584 return MethodDelegation .to (
557585 new DefaultRestrictionCoder (signature .getInitialRestriction ().restrictionT ()));
558586 } else {
@@ -583,7 +611,7 @@ private static Implementation getWatermarkEstimatorStateCoderDelegation(
583611 }
584612
585613 private static Implementation splitRestrictionDelegation (
586- TypeDescription doFnType , DoFnSignature .SplitRestrictionMethod signature ) {
614+ TypeDescription doFnType , DoFnSignature .@ Nullable SplitRestrictionMethod signature ) {
587615 if (signature == null ) {
588616 return MethodDelegation .to (DefaultSplitRestriction .class );
589617 } else {
@@ -592,7 +620,7 @@ private static Implementation splitRestrictionDelegation(
592620 }
593621
594622 private static Implementation truncateRestrictionDelegation (
595- TypeDescription doFnType , DoFnSignature .TruncateRestrictionMethod signature ) {
623+ TypeDescription doFnType , DoFnSignature .@ Nullable TruncateRestrictionMethod signature ) {
596624 if (signature == null ) {
597625 return MethodDelegation .to (DefaultTruncateRestriction .class );
598626 } else {
@@ -641,22 +669,22 @@ private static Implementation getSizeDelegation(
641669
642670 /** Delegates to the given method if available, or does nothing. */
643671 private static Implementation delegateOrNoop (
644- TypeDescription doFnType , DoFnSignature .DoFnMethod method ) {
672+ TypeDescription doFnType , DoFnSignature .@ Nullable DoFnMethod method ) {
645673 return (method == null )
646674 ? FixedValue .originType ()
647675 : new DoFnMethodDelegation (doFnType , method .targetMethod ());
648676 }
649677
650678 /** Delegates method with extra parameters to the given method if available, or does nothing. */
651679 private static Implementation delegateMethodWithExtraParametersOrNoop (
652- TypeDescription doFnType , DoFnSignature .MethodWithExtraParameters method ) {
680+ TypeDescription doFnType , DoFnSignature .@ Nullable MethodWithExtraParameters method ) {
653681 return (method == null )
654682 ? FixedValue .originType ()
655683 : new DoFnMethodWithExtraParametersDelegation (doFnType , method );
656684 }
657685
658686 private static Implementation delegateMethodWithExtraParametersOrThrow (
659- TypeDescription doFnType , DoFnSignature .MethodWithExtraParameters method ) {
687+ TypeDescription doFnType , DoFnSignature .@ Nullable MethodWithExtraParameters method ) {
660688 return (method == null )
661689 ? ExceptionMethod .throwing (UnsupportedOperationException .class )
662690 : new DoFnMethodWithExtraParametersDelegation (doFnType , method );
@@ -673,7 +701,7 @@ static class DoFnMethodDelegation implements Implementation {
673701 private final boolean targetHasReturn ;
674702
675703 /** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
676- protected @ Nullable FieldDescription delegateField ;
704+ protected @ MonotonicNonNull FieldDescription delegateField ;
677705
678706 private final TypeDescription doFnType ;
679707
@@ -686,10 +714,14 @@ public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {
686714
687715 @ Override
688716 public InstrumentedType prepare (InstrumentedType instrumentedType ) {
717+ TypeDescription .Generic superclassDoFnInvokerBase =
718+ checkStateNotNull (
719+ instrumentedType .getSuperClass (), // always DoFnInvokerBase
720+ "internal error: expected superclass to be DoFnInvokerBase, but was null" );
721+
689722 // Remember the field description of the instrumented type.
690723 delegateField =
691- instrumentedType
692- .getSuperClass () // always DoFnInvokerBase
724+ superclassDoFnInvokerBase
693725 .getDeclaredFields ()
694726 .filter (ElementMatchers .named (FN_DELEGATE_FIELD_NAME ))
695727 .getOnly ();
@@ -728,6 +760,9 @@ public ByteCodeAppender.Size apply(
728760 }
729761 }
730762
763+ checkStateNotNull (
764+ delegateField , "delegateField is null - must call prepare(...) before delegation" );
765+
731766 StackManipulation manipulation =
732767 new StackManipulation .Compound (
733768 // Push "this" (DoFnInvoker on top of the stack)
@@ -799,6 +834,9 @@ public DoFnMethodWithExtraParametersDelegation(
799834
800835 @ Override
801836 protected StackManipulation beforeDelegation (MethodDescription instrumentedMethod ) {
837+ checkStateNotNull (
838+ delegateField , "delegateField is null - must call prepare(...) before delegation" );
839+
802840 // Parameters of the wrapper invoker method:
803841 // DoFn.ArgumentProvider
804842 // Parameters of the wrapped DoFn method:
@@ -1249,7 +1287,11 @@ private void visitFrame(
12491287
12501288 Type [] localTypes = Type .getArgumentTypes (instrumentedMethod .getDescriptor ());
12511289 Object [] locals = new Object [1 + localTypes .length + (hasReturnLocal ? 1 : 0 )];
1252- locals [0 ] = instrumentedMethod .getReceiverType ().asErasure ().getInternalName ();
1290+ TypeDescription .Generic receiverType =
1291+ checkStateNotNull (
1292+ instrumentedMethod .getReceiverType (),
1293+ "invalid static method used as annotated DoFn method" );
1294+ locals [0 ] = receiverType .asErasure ().getInternalName ();
12531295 for (int i = 0 ; i < localTypes .length ; i ++) {
12541296 locals [i + 1 ] = describeType (localTypes [i ]);
12551297 }
@@ -1308,8 +1350,11 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) {
13081350 mv .visitLabel (wrapEnd );
13091351 if (returnVarIndex != null ) {
13101352 // Drop the return type from the locals
1353+ @ SuppressWarnings ("nullness" ) // bytebuddy MethodVisitor not annotated, so we lie
1354+ @ NonNull
1355+ String nullSignature = null ;
13111356 mv .visitLocalVariable (
1312- "res" , returnType .getDescriptor (), null , wrapStart , wrapEnd , returnVarIndex );
1357+ "res" , returnType .getDescriptor (), nullSignature , wrapStart , wrapEnd , returnVarIndex );
13131358 }
13141359
13151360 return size ;
0 commit comments