Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn]: Added logLevel to change the log level of Function/Source/Sink Pod/Process to desired Level #363

Open
wants to merge 6 commits into
base: 3.1_ds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum Runtime {

private String outputSerdeClassName;
private String logTopic;
private String logLevel;
private ProcessingGuarantees processingGuarantees;
// Do we want function instances to process data in the same order as in the input topics
// This essentially means that every partition of input topic is consumed by only one instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class SinkConfig {

private String deadLetterTopic;

private String logLevel;

private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class SourceConfig {

private String schemaType;

private String logLevel;

private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ abstract class FunctionDetailsCommand extends BaseCommand {
@Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Function are produced"
+ " #Java, Python, Go")
protected String logTopic;

@Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Function are produced"
+ " #Java")
protected String logLevel;
@Parameter(names = {"-st", "--schema-type"}, description = "The builtin schema type or "
+ "custom schema class name to be used for messages output by the function #Java")
protected String schemaType = "";
Expand Down Expand Up @@ -518,6 +520,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
functionConfig.setLogLevel(logLevel);
}
if (null != className) {
functionConfig.setClassName(className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+ "applied before the Sink")
protected String transformFunctionConfig;

@Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Sink are produced")
protected String logLevel;
protected SinkConfig sinkConfig;

private void mergeArgs() {
Expand Down Expand Up @@ -606,6 +607,10 @@ void processArguments() throws Exception {
sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
}

if (null != logLevel) {
sinkConfig.setLogLevel(logLevel);
}

// check if configs are valid
validateSinkConfigs(sinkConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
@Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
+ "how the secret is fetched by the underlying secrets provider")
protected String secretsString;
@Parameter(names = "--logLevel", description = "Log level at which the logs of a Pulsar Source are produced")
protected String logLevel;

protected SourceConfig sourceConfig;

Expand Down Expand Up @@ -499,6 +501,10 @@ void processArguments() throws Exception {
sourceConfig.setSecrets(secretsMap);
}

if (null != logLevel) {
sourceConfig.setLogLevel(logLevel);
}

// check if source configs are valid
validateSourceConfigs(sourceConfig);
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message FunctionDetails {
bool retainOrdering = 21;
bool retainKeyOrdering = 22;
SubscriptionPosition subscriptionPosition = 23;
string logLevel = 24;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>Console</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand All @@ -92,7 +93,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
logConfigFile, logLevel, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, narExtractionDirectory,
functionInstanceClassPath, false, pulsarWebServiceUrl));
Expand Down Expand Up @@ -309,6 +310,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand Down Expand Up @@ -354,6 +356,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
}
args.add("-Dpulsar.log.level=" + logLevel);
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand Down Expand Up @@ -236,6 +237,10 @@ public class KubernetesRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.authConfig = authConfig;

Expand Down Expand Up @@ -275,6 +280,7 @@ public class KubernetesRuntime implements Runtime {
grpcPort,
-1L,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class ProcessRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.extraDependenciesDir = extraDependenciesDir;
this.narExtractionDirectory = narExtractionDirectory;
this.processArgs = RuntimeUtils.composeCmd(
Expand All @@ -142,6 +147,7 @@ class ProcessRuntime implements Runtime {
instanceConfig.getPort(),
expectedHealthCheckInterval,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Excepti
23,
1234L,
"logConfigFile",
"info",
"secretsProviderClassName",
"secretsProviderConfig",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
String expectedArgs = "exec java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@

factory = createProcessRuntimeFactory(null);

verifyJavaInstance(config);

Check failure on line 222 in pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Other

ProcessRuntimeTest.testJavaConstructor

expected [53] but found [54]
}

@Test
Expand Down Expand Up @@ -324,6 +324,7 @@
String expectedArgs = "java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=java_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static class ExtractedFunctionDetails {

static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
private static final List<String> VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR");

private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();

Expand Down Expand Up @@ -272,6 +274,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(functionConfig.getLogLevel());
}
if (functionConfig.getRuntime() != null) {
functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
Expand Down Expand Up @@ -447,6 +452,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
functionConfig.setLogLevel(functionDetails.getLogLevel());
}
if (functionDetails.getSink().getForwardSourceMessageProperty()) {
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
}
Expand Down Expand Up @@ -806,6 +814,13 @@ public static void doCommonChecks(FunctionConfig functionConfig) {
}
}

if (!isEmpty(functionConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(functionConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", functionConfig.getLogLevel()));
}
}

if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Function parallelism must be a positive number");
}
Expand Down Expand Up @@ -1017,6 +1032,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,6 +63,8 @@
@Slf4j
public class SinkConfigUtils {

private static final List<String> VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR");

@Getter
@Setter
@AllArgsConstructor
Expand All @@ -88,6 +91,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
functionDetailsBuilder.setName(sinkConfig.getName());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(sinkConfig.getLogLevel());
}
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
} else {
Expand Down Expand Up @@ -396,6 +402,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getUserConfig())) {
sinkConfig.setTransformFunctionConfig(functionDetails.getUserConfig());
}
if (!isEmpty(functionDetails.getLogLevel())) {
sinkConfig.setLogLevel(functionDetails.getLogLevel());
}


return sinkConfig;
Expand Down Expand Up @@ -548,6 +557,13 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}

if (!isEmpty(sinkConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(sinkConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", sinkConfig.getLogLevel()));
}
}

// validate user defined config if enabled and classloading is enabled
if (validateConnectorConfig) {
if (sinkFunction.isEnableClassloading()) {
Expand Down Expand Up @@ -712,6 +728,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (newConfig.getSourceSubscriptionPosition() != null) {
mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
return mergedConfig;
}

Expand Down
Loading
Loading