Skip to content

Fix nullability in ByteBuddyDoFnInvokerFactory and DoFnOutputReceivers #35020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand All @@ -36,9 +37,6 @@

/** Common {@link OutputReceiver} and {@link MultiOutputReceiver} classes. */
@Internal
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DoFnOutputReceivers {
private static class RowOutputReceiver<T> implements OutputReceiver<Row> {
WindowedContextOutputReceiver<T> outputReceiver;
Expand Down Expand Up @@ -139,7 +137,7 @@ public <T> OutputReceiver<T> get(TupleTag<T> tag) {
@Override
public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
Coder<T> outputCoder = (Coder<T>) checkNotNull(outputCoders).get(tag);
checkState(outputCoder != null, "No output tag for " + tag);
checkStateNotNull(outputCoder, "No output tag for " + tag);
checkState(
outputCoder instanceof SchemaCoder,
"Output with tag " + tag + " must have a schema in order to call getRowReceiver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms.reflect;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -107,14 +108,13 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes"
})
@SuppressWarnings({"rawtypes"})
class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {

public static final String SETUP_CONTEXT_PARAMETER_METHOD = "setupContext";
Expand Down Expand Up @@ -271,14 +271,20 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
(DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
getByteBuddyInvokerConstructor(signature).newInstance(fn);

for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
invoker.addOnTimerInvoker(
onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
if (signature.onTimerMethods() != null) {
for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
invoker.addOnTimerInvoker(
onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
}
}
for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
signature.onTimerFamilyMethods().values()) {
invoker.addOnTimerFamilyInvoker(
onTimerFamilyMethod.id(), OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id()));

if (signature.onTimerFamilyMethods() != null) {
for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
signature.onTimerFamilyMethods().values()) {
invoker.addOnTimerFamilyInvoker(
onTimerFamilyMethod.id(),
OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id()));
}
}
return invoker;
} catch (InstantiationException
Expand Down Expand Up @@ -313,7 +319,10 @@ private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
/** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
public static class DefaultSplitRestriction {
/** Doesn't split the restriction. */
@SuppressWarnings("unused")
@SuppressWarnings({
"unused",
"nullness" // TODO: DoFn parameter is actually never used by outputReceiver
})
public static void invokeSplitRestriction(DoFnInvoker.ArgumentProvider argumentProvider) {
argumentProvider.outputReceiver(null).output(argumentProvider.restriction());
}
Expand All @@ -324,7 +333,7 @@ public static class DefaultTruncateRestriction {

/** Output the current restriction if it is bounded. Otherwise, return null. */
@SuppressWarnings("unused")
public static TruncateResult<?> invokeTruncateRestriction(
public static @Nullable TruncateResult<?> invokeTruncateRestriction(
DoFnInvoker.ArgumentProvider argumentProvider) {
if (argumentProvider.restrictionTracker().isBounded() == IsBounded.BOUNDED) {
return TruncateResult.of(argumentProvider.restriction());
Expand Down Expand Up @@ -374,7 +383,7 @@ Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
public static class DefaultGetInitialWatermarkEstimatorState {
/** The default watermark estimator state is {@code null}. */
@SuppressWarnings("unused")
public static <InputT, OutputT, WatermarkEstimatorStateT>
public static <InputT, OutputT, WatermarkEstimatorStateT> @Nullable
WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(
DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
return null;
Expand Down Expand Up @@ -404,6 +413,8 @@ public Instant currentWatermark() {
}

@Override
@SuppressWarnings("nullness") // WatermarkEstimatorStateT may or may not allow nullness
// if it does not, then this method must be overridden in the DoFn
public WatermarkEstimatorStateT getState() {
return null;
}
Expand Down Expand Up @@ -529,14 +540,27 @@ private static ClassLoadingStrategy<ClassLoader> getClassLoadingStrategy(Class<?
try {
ClassLoadingStrategy<ClassLoader> strategy;
if (ClassInjector.UsingLookup.isAvailable()) {
@SuppressWarnings(
"nullness") // Method.invoke not annotated to allow null receiver, so we lie
@NonNull
Object nullObjectForStaticMethodInvocation = null;

Class<?> methodHandles = Class.forName("java.lang.invoke.MethodHandles");
Object lookup = methodHandles.getMethod("lookup").invoke(null);
Object lookup =
checkStateNotNull(
methodHandles.getMethod("lookup").invoke(nullObjectForStaticMethodInvocation),
"'MethodHandles.lookup' declared available, but lookup returned null");

Method privateLookupIn =
methodHandles.getMethod(
"privateLookupIn",
Class.class,
Class.forName("java.lang.invoke.MethodHandles$Lookup"));
Object privateLookup = privateLookupIn.invoke(null, targetClass, lookup);

Object privateLookup =
checkStateNotNull(
privateLookupIn.invoke(nullObjectForStaticMethodInvocation, targetClass, lookup),
"MethodHandles.Lookup.privateLookupIn not available");
strategy = ClassLoadingStrategy.UsingLookup.of(privateLookup);
} else if (ClassInjector.UsingReflection.isAvailable()) {
strategy = ClassLoadingStrategy.Default.INJECTION;
Expand All @@ -553,6 +577,10 @@ private static Implementation getRestrictionCoderDelegation(
TypeDescription doFnType, DoFnSignature signature) {
if (signature.processElement().isSplittable()) {
if (signature.getRestrictionCoder() == null) {
checkStateNotNull(
signature.getInitialRestriction(),
"@GetRestrictionCoder provided but @GetInitialRestriction not provided:"
+ " please add @GetInitialRestriction");
return MethodDelegation.to(
new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
} else {
Expand Down Expand Up @@ -583,7 +611,7 @@ private static Implementation getWatermarkEstimatorStateCoderDelegation(
}

private static Implementation splitRestrictionDelegation(
TypeDescription doFnType, DoFnSignature.SplitRestrictionMethod signature) {
TypeDescription doFnType, DoFnSignature.@Nullable SplitRestrictionMethod signature) {
if (signature == null) {
return MethodDelegation.to(DefaultSplitRestriction.class);
} else {
Expand All @@ -592,7 +620,7 @@ private static Implementation splitRestrictionDelegation(
}

private static Implementation truncateRestrictionDelegation(
TypeDescription doFnType, DoFnSignature.TruncateRestrictionMethod signature) {
TypeDescription doFnType, DoFnSignature.@Nullable TruncateRestrictionMethod signature) {
if (signature == null) {
return MethodDelegation.to(DefaultTruncateRestriction.class);
} else {
Expand Down Expand Up @@ -641,22 +669,22 @@ private static Implementation getSizeDelegation(

/** Delegates to the given method if available, or does nothing. */
private static Implementation delegateOrNoop(
TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
TypeDescription doFnType, DoFnSignature.@Nullable DoFnMethod method) {
return (method == null)
? FixedValue.originType()
: new DoFnMethodDelegation(doFnType, method.targetMethod());
}

/** Delegates method with extra parameters to the given method if available, or does nothing. */
private static Implementation delegateMethodWithExtraParametersOrNoop(
TypeDescription doFnType, DoFnSignature.MethodWithExtraParameters method) {
TypeDescription doFnType, DoFnSignature.@Nullable MethodWithExtraParameters method) {
return (method == null)
? FixedValue.originType()
: new DoFnMethodWithExtraParametersDelegation(doFnType, method);
}

private static Implementation delegateMethodWithExtraParametersOrThrow(
TypeDescription doFnType, DoFnSignature.MethodWithExtraParameters method) {
TypeDescription doFnType, DoFnSignature.@Nullable MethodWithExtraParameters method) {
return (method == null)
? ExceptionMethod.throwing(UnsupportedOperationException.class)
: new DoFnMethodWithExtraParametersDelegation(doFnType, method);
Expand All @@ -673,7 +701,7 @@ static class DoFnMethodDelegation implements Implementation {
private final boolean targetHasReturn;

/** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
protected @Nullable FieldDescription delegateField;
protected @MonotonicNonNull FieldDescription delegateField;

private final TypeDescription doFnType;

Expand All @@ -686,10 +714,14 @@ public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {

@Override
public InstrumentedType prepare(InstrumentedType instrumentedType) {
TypeDescription.Generic superclassDoFnInvokerBase =
checkStateNotNull(
instrumentedType.getSuperClass(), // always DoFnInvokerBase
"internal error: expected superclass to be DoFnInvokerBase, but was null");

// Remember the field description of the instrumented type.
delegateField =
instrumentedType
.getSuperClass() // always DoFnInvokerBase
superclassDoFnInvokerBase
.getDeclaredFields()
.filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
.getOnly();
Expand Down Expand Up @@ -728,6 +760,9 @@ public ByteCodeAppender.Size apply(
}
}

checkStateNotNull(
delegateField, "delegateField is null - must call prepare(...) before delegation");

StackManipulation manipulation =
new StackManipulation.Compound(
// Push "this" (DoFnInvoker on top of the stack)
Expand Down Expand Up @@ -799,6 +834,9 @@ public DoFnMethodWithExtraParametersDelegation(

@Override
protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
checkStateNotNull(
delegateField, "delegateField is null - must call prepare(...) before delegation");

// Parameters of the wrapper invoker method:
// DoFn.ArgumentProvider
// Parameters of the wrapped DoFn method:
Expand Down Expand Up @@ -1249,7 +1287,11 @@ private void visitFrame(

Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor());
Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)];
locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName();
TypeDescription.Generic receiverType =
checkStateNotNull(
instrumentedMethod.getReceiverType(),
"invalid static method used as annotated DoFn method");
locals[0] = receiverType.asErasure().getInternalName();
for (int i = 0; i < localTypes.length; i++) {
locals[i + 1] = describeType(localTypes[i]);
}
Expand Down Expand Up @@ -1308,8 +1350,11 @@ public StackManipulation.Size apply(MethodVisitor mv, Context context) {
mv.visitLabel(wrapEnd);
if (returnVarIndex != null) {
// Drop the return type from the locals
@SuppressWarnings("nullness") // bytebuddy MethodVisitor not annotated, so we lie
@NonNull
String nullSignature = null;
mv.visitLocalVariable(
"res", returnType.getDescriptor(), null, wrapStart, wrapEnd, returnVarIndex);
"res", returnType.getDescriptor(), nullSignature, wrapStart, wrapEnd, returnVarIndex);
}

return size;
Expand Down
Loading