Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
Expand All @@ -32,7 +33,9 @@
import java.io.IOException;
import java.net.BindException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/** Tools for starting the Actor Systems used to run the JobManager and TaskManager actors. */
public class ActorSystemBootstrapTools {
Expand Down Expand Up @@ -241,6 +244,20 @@ public static ActorSystem startLocalActorSystem(
}
}

/**
* Converts the given Pekko {@link Config} into a flattened {@link Map}.
*
* @param config The Pekko configuration
* @return A map of configuration keys to string values
*/
private static Map<String, String> toMap(Config config) {
return config.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped())));
}

/**
* Starts an Actor System with given Pekko config.
*
Expand All @@ -251,7 +268,11 @@ public static ActorSystem startLocalActorSystem(
*/
private static ActorSystem startActorSystem(
Config config, String actorSystemName, Logger logger) {
logger.debug("Using pekko configuration\n {}", config);
if (logger.isDebugEnabled()) {
logger.debug(
"Using pekko configuration\n {}",
ConfigurationUtils.hideSensitiveValues(toMap(config)));
}
ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config);

logger.info("Actor system started at {}", PekkoUtils.getAddress(actorSystem));
Expand Down