Skip to content

[improve][fn]: Added logLevel to change the log level of Function/Source/Sink Pod/Process to desired Level #23844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum Runtime {

private String outputSerdeClassName;
private String logTopic;
private String logLevel;
private ProcessingGuarantees processingGuarantees;
// Do we want function instances to process data in the same order as in the input topics
// This essentially means that every partition of input topic is consumed by only one instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,5 @@ public class SinkConfig {
private String transformFunctionClassName;
private String transformFunctionConfig;
private String logTopic;
private String logLevel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ public class SourceConfig {
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
private String logTopic;
private String logLevel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
+ " #Java, Python, Go")
protected String logTopic;

@Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced"
+ " #Java")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced"
+ " #Java")
@Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current changes is limited to Java runtime only.

Copy link
Contributor Author

@mukesh154 mukesh154 Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added functionality for Python runtime as well & committed your suggestion.

protected String logLevel;

@Option(names = {"-st", "--schema-type"}, description = "The builtin schema type or "
+ "custom schema class name to be used for messages output by the function #Java")
protected String schemaType = "";
Expand Down Expand Up @@ -506,6 +510,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
functionConfig.setLogLevel(logLevel);
}
if (null != className) {
functionConfig.setClassName(className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand {
protected String transformFunctionConfig;
@Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;
@Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Sink are produced")
protected String logLevel;
@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime"
+ " (for process & Kubernetes runtime only).")
protected String runtimeFlags;
Expand Down Expand Up @@ -611,6 +613,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
sinkConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
sinkConfig.setLogLevel(logLevel);
}
if (null != runtimeFlags) {
sinkConfig.setRuntimeFlags(runtimeFlags);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
protected String secretsString;
@Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;
@Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Source are produced")
protected String logLevel;
@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime"
+ " (for process & Kubernetes runtime only).")
protected String runtimeFlags;
Expand Down Expand Up @@ -505,6 +507,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
sourceConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
sourceConfig.setLogLevel(logLevel);
}
if (null != runtimeFlags) {
sourceConfig.setRuntimeFlags(runtimeFlags);
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message FunctionDetails {
bool retainOrdering = 21;
bool retainKeyOrdering = 22;
SubscriptionPosition subscriptionPosition = 23;
string logLevel = 24;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>Console</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand All @@ -92,7 +93,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
logConfigFile, logLevel, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, narExtractionDirectory,
functionInstanceClassPath, false, pulsarWebServiceUrl));
Expand Down Expand Up @@ -315,6 +316,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand Down Expand Up @@ -360,6 +362,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
}
args.add("-Dpulsar.log.level=" + logLevel);
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand Down Expand Up @@ -236,6 +237,10 @@ public class KubernetesRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.authConfig = authConfig;

Expand Down Expand Up @@ -275,6 +280,7 @@ public class KubernetesRuntime implements Runtime {
grpcPort,
-1L,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class ProcessRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.extraDependenciesDir = extraDependenciesDir;
this.narExtractionDirectory = narExtractionDirectory;
this.processArgs = RuntimeUtils.composeCmd(
Expand All @@ -142,6 +147,7 @@ class ProcessRuntime implements Runtime {
instanceConfig.getPort(),
expectedHealthCheckInterval,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Excepti
23,
1234L,
"logConfigFile",
"info",
"secretsProviderClassName",
"secretsProviderConfig",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
String expectedArgs = "exec java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
String expectedArgs = "java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=java_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static class ExtractedFunctionDetails {

static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
private static final List<String> VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL");

private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();

Expand Down Expand Up @@ -272,6 +274,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(functionConfig.getLogLevel());
}
if (functionConfig.getRuntime() != null) {
functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
Expand Down Expand Up @@ -447,6 +452,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
functionConfig.setLogLevel(functionDetails.getLogLevel());
}
if (functionDetails.getSink().getForwardSourceMessageProperty()) {
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
}
Expand Down Expand Up @@ -806,6 +814,13 @@ public static void doCommonChecks(FunctionConfig functionConfig) {
}
}

if (!isEmpty(functionConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(functionConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", functionConfig.getLogLevel()));
}
}

if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Function parallelism must be a positive number");
}
Expand Down Expand Up @@ -1027,6 +1042,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,6 +63,8 @@
@Slf4j
public class SinkConfigUtils {

private static final List<String> VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL");

@Getter
@Setter
@AllArgsConstructor
Expand Down Expand Up @@ -90,6 +93,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
if (sinkConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
}
if (sinkConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(sinkConfig.getLogLevel());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
Expand Down Expand Up @@ -327,6 +333,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getLogTopic())) {
sinkConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
sinkConfig.setLogLevel(functionDetails.getLogLevel());
}

sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));

Expand Down Expand Up @@ -439,6 +448,13 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
}
}

if (!isEmpty(sinkConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(sinkConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", sinkConfig.getLogLevel()));
}
}

if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Sink parallelism must be a positive number");
}
Expand Down Expand Up @@ -628,6 +644,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}

if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -56,6 +58,8 @@
@Slf4j
public class SourceConfigUtils {

private static final List<String> VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL");

@Getter
@Setter
@AllArgsConstructor
Expand Down Expand Up @@ -83,6 +87,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
if (sourceConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic());
}
if (sourceConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(sourceConfig.getLogLevel());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sourceConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
Expand Down Expand Up @@ -241,6 +248,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getLogTopic())) {
sourceConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
sourceConfig.setLogLevel(functionDetails.getLogLevel());
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down Expand Up @@ -281,6 +291,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic()));
}
}
if (!isEmpty(sourceConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(sourceConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", sourceConfig.getLogLevel()));
}
}
if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Source parallelism must be a positive number");
}
Expand Down Expand Up @@ -410,6 +426,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Loading
Loading