Skip to content

Commit 16598da

Browse files
[improve][fn] Implement PIP-412: Support setting messagePayloadProcessor in Pulsar Functions and Sinks (#24163)
1 parent e7eb7b3 commit 16598da

File tree

16 files changed

+573
-21
lines changed

16 files changed

+573
-21
lines changed

Diff for: pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ConsumerConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ConsumerConfig {
4545
private Map<String, String> consumerProperties = new HashMap<>();
4646
private Integer receiverQueueSize;
4747
private CryptoConfig cryptoConfig;
48+
private MessagePayloadProcessorConfig messagePayloadProcessorConfig;
4849
private boolean poolMessages;
4950

5051
public ConsumerConfig(String schemaType) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.functions;
20+
21+
import java.util.Map;
22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
24+
import lombok.Data;
25+
import lombok.EqualsAndHashCode;
26+
import lombok.NoArgsConstructor;
27+
28+
@Data
29+
@Builder
30+
@NoArgsConstructor
31+
@AllArgsConstructor
32+
@EqualsAndHashCode
33+
public class MessagePayloadProcessorConfig {
34+
private String className;
35+
private Map<String, Object> config;
36+
}

Diff for: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java

+5
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.pulsar.functions.utils.BatchingUtils;
105105
import org.apache.pulsar.functions.utils.CryptoUtils;
106106
import org.apache.pulsar.functions.utils.FunctionCommon;
107+
import org.apache.pulsar.functions.utils.MessagePayloadProcessorUtils;
107108
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
108109
import org.apache.pulsar.io.core.Sink;
109110
import org.apache.pulsar.io.core.Source;
@@ -793,6 +794,10 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
793794
if (conf.hasCryptoSpec()) {
794795
consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
795796
}
797+
if (conf.hasMessagePayloadProcessorSpec()) {
798+
consumerConfig.setMessagePayloadProcessorConfig(
799+
MessagePayloadProcessorUtils.convertFromSpec(conf.getMessagePayloadProcessorSpec()));
800+
}
796801
consumerConfig.setPoolMessages(conf.getPoolMessages());
797802

798803
topicSchema.put(topic, consumerConfig);

Diff for: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java

+9
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pulsar.common.functions.FunctionConfig;
3838
import org.apache.pulsar.functions.api.Record;
3939
import org.apache.pulsar.functions.utils.CryptoUtils;
40+
import org.apache.pulsar.functions.utils.MessagePayloadProcessorUtils;
4041
import org.apache.pulsar.io.core.Source;
4142
import org.bouncycastle.jce.provider.BouncyCastleProvider;
4243

@@ -82,6 +83,9 @@ protected ConsumerBuilder<T> createConsumeBuilder(String topic, PulsarSourceCons
8283
if (conf.getCryptoKeyReader() != null) {
8384
cb = cb.cryptoKeyReader(conf.getCryptoKeyReader());
8485
}
86+
if (conf.getMessagePayloadProcessor() != null) {
87+
cb = cb.messagePayloadProcessor(conf.getMessagePayloadProcessor());
88+
}
8589
if (conf.getConsumerCryptoFailureAction() != null) {
8690
cb = cb.cryptoFailureAction(conf.getConsumerCryptoFailureAction());
8791
}
@@ -190,6 +194,11 @@ protected PulsarSourceConsumerConfig<T> buildPulsarSourceConsumerConfig(String t
190194
conf.getCryptoConfig().getCryptoKeyReaderClassName(),
191195
conf.getCryptoConfig().getCryptoKeyReaderConfig(), functionClassLoader));
192196
}
197+
if (conf.getMessagePayloadProcessorConfig() != null) {
198+
consumerConfBuilder.messagePayloadProcessor(MessagePayloadProcessorUtils.getMessagePayloadProcessorInstance(
199+
conf.getMessagePayloadProcessorConfig().getClassName(),
200+
conf.getMessagePayloadProcessorConfig().getConfig(), functionClassLoader));
201+
}
193202
consumerConfBuilder.poolMessages(conf.isPoolMessages());
194203
return consumerConfBuilder.build();
195204
}

Diff for: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConsumerConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import lombok.Data;
2424
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
2525
import org.apache.pulsar.client.api.CryptoKeyReader;
26+
import org.apache.pulsar.client.api.MessagePayloadProcessor;
2627
import org.apache.pulsar.client.api.Schema;
2728

2829
@Data
@@ -33,6 +34,7 @@ class PulsarSourceConsumerConfig<T> {
3334
private Integer receiverQueueSize;
3435
private Map<String, String> consumerProperties;
3536
private CryptoKeyReader cryptoKeyReader;
37+
private MessagePayloadProcessor messagePayloadProcessor;
3638
private ConsumerCryptoFailureAction consumerCryptoFailureAction;
3739
private boolean poolMessages;
3840
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.api.examples;
20+
21+
22+
import java.util.Map;
23+
import java.util.function.Consumer;
24+
import java.util.stream.Collectors;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.pulsar.client.api.Message;
27+
import org.apache.pulsar.client.api.MessagePayload;
28+
import org.apache.pulsar.client.api.MessagePayloadContext;
29+
import org.apache.pulsar.client.api.MessagePayloadProcessor;
30+
import org.apache.pulsar.client.api.Schema;
31+
32+
@Slf4j
33+
public class TestPayloadProcessor implements MessagePayloadProcessor {
34+
public TestPayloadProcessor() {
35+
log.info("TestPayloadProcessor constructor without configs");
36+
}
37+
38+
public TestPayloadProcessor(Map<String, Object> conf) {
39+
String configs = conf.entrySet().stream()
40+
.map(entry -> entry.getKey() + "=" + entry.getValue())
41+
.collect(Collectors.joining(", "));
42+
log.info("TestPayloadProcessor constructor with configs {}", configs);
43+
}
44+
45+
@Override
46+
public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
47+
Consumer<Message<T>> messageConsumer) throws Exception {
48+
log.info("Processing message using TestPayloadProcessor");
49+
if (context.isBatch()) {
50+
final int numMessages = context.getNumMessages();
51+
for (int i = 0; i < numMessages; i++) {
52+
messageConsumer.accept(context.getMessageAt(i, numMessages, payload, true, schema));
53+
}
54+
} else {
55+
messageConsumer.accept(context.asSingleMessage(payload, schema));
56+
}
57+
}
58+
}

Diff for: pulsar-functions/proto/src/main/proto/Function.proto

+6
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ message ConsumerSpec {
112112
map<string, string> consumerProperties = 6;
113113
CryptoSpec cryptoSpec = 7;
114114
bool poolMessages = 8;
115+
MessagePayloadProcessorSpec messagePayloadProcessorSpec = 9;
115116
}
116117

117118
message ProducerSpec {
@@ -148,6 +149,11 @@ message CryptoSpec {
148149
FailureAction consumerCryptoFailureAction = 5;
149150
}
150151

152+
message MessagePayloadProcessorSpec {
153+
string className = 1;
154+
string configs = 2;
155+
}
156+
151157
message BatchingSpec {
152158
bool enabled = 1;
153159
int32 batchingMaxPublishDelayMs = 2;

Diff for: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java

+17
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
159159
if (consumerConf.getCryptoConfig() != null) {
160160
bldr.setCryptoSpec(CryptoUtils.convert(consumerConf.getCryptoConfig()));
161161
}
162+
if (consumerConf.getMessagePayloadProcessorConfig() != null) {
163+
bldr.setMessagePayloadProcessorSpec(
164+
MessagePayloadProcessorUtils.convert(consumerConf.getMessagePayloadProcessorConfig()));
165+
}
162166
bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
163167
bldr.setPoolMessages(consumerConf.isPoolMessages());
164168
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
@@ -409,6 +413,10 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
409413
if (input.getValue().hasCryptoSpec()) {
410414
consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(input.getValue().getCryptoSpec()));
411415
}
416+
if (input.getValue().hasMessagePayloadProcessorSpec()) {
417+
consumerConfig.setMessagePayloadProcessorConfig(MessagePayloadProcessorUtils.convertFromSpec(
418+
input.getValue().getMessagePayloadProcessorSpec()));
419+
}
412420
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
413421
consumerConfig.setSchemaProperties(input.getValue().getSchemaPropertiesMap());
414422
consumerConfig.setPoolMessages(input.getValue().getPoolMessages());
@@ -684,6 +692,10 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
684692
ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(),
685693
validatableFunctionPackage.getTypePool(), false);
686694
}
695+
if (conf.getMessagePayloadProcessorConfig() != null) {
696+
ValidatorUtils.validateMessagePayloadProcessor(conf.getMessagePayloadProcessorConfig(),
697+
validatableFunctionPackage.getTypePool());
698+
}
687699
});
688700
}
689701

@@ -900,6 +912,11 @@ public static void doCommonChecks(FunctionConfig functionConfig) {
900912
throw new IllegalArgumentException(
901913
"CryptoKeyReader class name required");
902914
}
915+
if (conf.getMessagePayloadProcessorConfig() != null && isBlank(
916+
conf.getMessagePayloadProcessorConfig().getClassName())) {
917+
throw new IllegalArgumentException(
918+
"MessagePayloadProcessor class name required");
919+
}
903920
});
904921
}
905922

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.utils;
20+
21+
import static org.apache.commons.lang.StringUtils.isEmpty;
22+
import com.google.gson.Gson;
23+
import com.google.gson.reflect.TypeToken;
24+
import java.lang.reflect.Constructor;
25+
import java.lang.reflect.InvocationTargetException;
26+
import java.lang.reflect.Type;
27+
import java.util.Map;
28+
import org.apache.pulsar.client.api.MessagePayloadProcessor;
29+
import org.apache.pulsar.common.functions.MessagePayloadProcessorConfig;
30+
import org.apache.pulsar.common.util.ClassLoaderUtils;
31+
import org.apache.pulsar.functions.proto.Function;
32+
33+
public class MessagePayloadProcessorUtils {
34+
public static MessagePayloadProcessor getMessagePayloadProcessorInstance(String className,
35+
Map<String, Object> configs,
36+
ClassLoader classLoader) {
37+
Class<?> payloadProcessorClass;
38+
try {
39+
payloadProcessorClass = ClassLoaderUtils.loadClass(className, classLoader);
40+
} catch (ClassNotFoundException e) {
41+
throw new RuntimeException(
42+
String.format("Failed to load message payload processor class %sx", className));
43+
}
44+
45+
try {
46+
if (configs == null || configs.isEmpty()) {
47+
Constructor<?> ctor = payloadProcessorClass.getConstructor();
48+
return (MessagePayloadProcessor) ctor.newInstance();
49+
} else {
50+
Constructor<?> ctor = payloadProcessorClass.getConstructor(Map.class);
51+
return (MessagePayloadProcessor) ctor.newInstance(configs);
52+
}
53+
} catch (NoSuchMethodException e) {
54+
if (configs == null || configs.isEmpty()) {
55+
throw new RuntimeException("Message payload processor class does not have default constructor", e);
56+
} else {
57+
throw new RuntimeException("Message payload processor class does not have constructor accepts map", e);
58+
}
59+
} catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
60+
throw new RuntimeException("Failed to create instance for message payload processor class", e);
61+
}
62+
}
63+
64+
public static MessagePayloadProcessorConfig convertFromSpec(Function.MessagePayloadProcessorSpec spec) {
65+
if (spec == null || isEmpty(spec.getClassName())) {
66+
return null;
67+
}
68+
69+
MessagePayloadProcessorConfig.MessagePayloadProcessorConfigBuilder bldr =
70+
MessagePayloadProcessorConfig.builder();
71+
72+
Type type = new TypeToken<Map<String, Object>>() {
73+
}.getType();
74+
Map<String, Object> configs = new Gson().fromJson(spec.getConfigs(), type);
75+
76+
bldr.className(spec.getClassName()).config(configs);
77+
78+
return bldr.build();
79+
}
80+
81+
public static Function.MessagePayloadProcessorSpec convert(MessagePayloadProcessorConfig config) {
82+
Function.MessagePayloadProcessorSpec.Builder bldr = Function.MessagePayloadProcessorSpec.newBuilder()
83+
.setClassName(config.getClassName());
84+
85+
if (config.getConfig() != null) {
86+
Type type = new TypeToken<Map<String, Object>>() {
87+
}.getType();
88+
String readerConfigString = new Gson().toJson(config.getConfig(), type);
89+
bldr.setConfigs(readerConfigString);
90+
}
91+
92+
return bldr.build();
93+
}
94+
}

Diff for: pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
162162
if (spec.getCryptoConfig() != null) {
163163
bldr.setCryptoSpec(CryptoUtils.convert(spec.getCryptoConfig()));
164164
}
165+
if (spec.getMessagePayloadProcessorConfig() != null) {
166+
bldr.setMessagePayloadProcessorSpec(
167+
MessagePayloadProcessorUtils.convert(spec.getMessagePayloadProcessorConfig()));
168+
}
165169
bldr.putAllConsumerProperties(spec.getConsumerProperties());
166170
bldr.setPoolMessages(spec.isPoolMessages());
167171
sourceSpecBuilder.putInputSpecs(topic, bldr.build());
@@ -303,6 +307,10 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
303307
if (input.getValue().hasCryptoSpec()) {
304308
consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(input.getValue().getCryptoSpec()));
305309
}
310+
if (input.getValue().hasMessagePayloadProcessorSpec()) {
311+
consumerConfig.setMessagePayloadProcessorConfig(MessagePayloadProcessorUtils.convertFromSpec(
312+
input.getValue().getMessagePayloadProcessorSpec()));
313+
}
306314
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
307315
consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap());
308316
consumerConfig.setPoolMessages(input.getValue().getPoolMessages());
@@ -352,7 +360,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
352360
if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) {
353361
TypeReference<HashMap<String, Object>> typeRef =
354362
new TypeReference<HashMap<String, Object>>() {
355-
};
363+
};
356364
Map<String, Object> configMap;
357365
try {
358366
configMap =
@@ -513,9 +521,9 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
513521
}
514522

515523
if (sinkConfig.getTopicToSerdeClassName() != null) {
516-
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
517-
ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
518-
}
524+
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
525+
ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
526+
}
519527
}
520528

521529
if (sinkConfig.getTopicToSchemaType() != null) {
@@ -544,6 +552,10 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
544552
ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
545553
inputFunction.getTypePool(), false);
546554
}
555+
if (consumerSpec.getMessagePayloadProcessorConfig() != null) {
556+
ValidatorUtils.validateMessagePayloadProcessor(consumerSpec.getMessagePayloadProcessorConfig(),
557+
inputFunction.getTypePool());
558+
}
547559
}
548560
}
549561

0 commit comments

Comments
 (0)