Skip to content

Commit 70bda8f

Browse files
committed
[Common][Server] Adapt configuration parser
- Load new common.yaml from config dir - Allow to specify dedicated configuration files to be loaded in the addition to common.yaml from the config dir - Ensure backward compatibility for old server.yaml - Add server type argument to ConfigurationParserUtils load method and adapt test cases
1 parent 06832fd commit 70bda8f

File tree

7 files changed

+120
-34
lines changed

7 files changed

+120
-34
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/GlobalConfiguration.java

+72-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.config;
1818

1919
import com.alibaba.fluss.annotation.Internal;
20+
import com.alibaba.fluss.annotation.VisibleForTesting;
2021
import com.alibaba.fluss.exception.IllegalConfigurationException;
2122

2223
import org.slf4j.Logger;
@@ -29,6 +30,9 @@
2930
import java.io.FileInputStream;
3031
import java.io.IOException;
3132
import java.io.InputStreamReader;
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
3236

3337
/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache
3438
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -46,19 +50,20 @@ public class GlobalConfiguration {
4650

4751
private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
4852

49-
public static final String FLUSS_CONF_FILENAME = "server.yaml";
53+
@VisibleForTesting
54+
public static final String[] FLUSS_CONF_FILENAME = new String[] {"server.yaml", "common.yaml"};
5055

5156
// --------------------------------------------------------------------------------------------
5257

5358
private GlobalConfiguration() {}
5459

5560
/**
56-
* Load the configuration files from the config file specified by key {@link
61+
* Loads the configuration files from the config file specified by key {@link
5762
* #SERVER_CONFIG_FILE} in {@code dynamicProperties}. If no config file is specified, it'll load
58-
* the configuration from the specified {@code defaultConfigDir}.
63+
* the common configuration from the specified {@code defaultConfigDir}.
5964
*
6065
* <p>If the {@code dynamicProperties} is not null, then it is added to the loaded
61-
* configuration.
66+
* configuration. Dynamic configuration options take precedence over file configuration options.
6267
*
6368
* @param defaultConfigDir directory to load the configuration from when no config file is
6469
* specified in the dynamic properties
@@ -67,16 +72,42 @@ private GlobalConfiguration() {}
6772
*/
6873
public static Configuration loadConfiguration(
6974
final String defaultConfigDir, @Nullable final Configuration dynamicProperties) {
75+
return loadConfiguration(defaultConfigDir, Collections.emptyList(), dynamicProperties);
76+
}
7077

71-
File yamlConfigFile = null;
78+
/**
79+
* Loads the configuration as described in {@link GlobalConfiguration#loadConfiguration(String,
80+
* Configuration)}, but allows to specify a list of files that should be loaded from {@code
81+
* defaultConfigDir} in addition to the common configuration, in case the user does not specify
82+
* {@link #SERVER_CONFIG_FILE} in {@code dynamicProperties}.
83+
*
84+
* <p>The configuration files are read in the specified order. If multiple configuration files
85+
* are given, and a configuration option is present in at least two of them, the configuration
86+
* option in the <i>latest</i> configuration file that contains the option takes precedence.
87+
* Additionally, dynamic configuration options take precedence over configuration options given
88+
* in <i>any</i> file.
89+
*
90+
* @param defaultConfigDir see {@link GlobalConfiguration#loadConfiguration(String,
91+
* Configuration)}
92+
* @param additionalDefaultFiles a list of additional config files that should be loaded from
93+
* defaultConfigDir that will be read in the given order
94+
* @param dynamicProperties see {@link GlobalConfiguration#loadConfiguration(String,
95+
* Configuration)}
96+
*/
97+
public static Configuration loadConfiguration(
98+
final String defaultConfigDir,
99+
final List<String> additionalDefaultFiles,
100+
@Nullable final Configuration dynamicProperties) {
101+
102+
List<File> yamlConfigFiles = new ArrayList<>();
72103

73104
// first, try to get the config file name from the dynamic properties
74105
// user passed
75106
if (dynamicProperties != null && dynamicProperties.contains(SERVER_CONFIG_FILE)) {
76107
// get the config file name passed by user
77108
String configFileName = dynamicProperties.getString(SERVER_CONFIG_FILE);
78109
dynamicProperties.removeConfig(SERVER_CONFIG_FILE);
79-
yamlConfigFile = new File(configFileName);
110+
File yamlConfigFile = new File(configFileName);
80111
if (!yamlConfigFile.exists() && !yamlConfigFile.isFile()) {
81112
throw new IllegalConfigurationException(
82113
"The given configuration file name '"
@@ -85,9 +116,10 @@ public static Configuration loadConfiguration(
85116
+ yamlConfigFile.getAbsolutePath()
86117
+ ") does not describe an existing file.");
87118
}
119+
yamlConfigFiles.add(yamlConfigFile);
88120
}
89121

90-
if (yamlConfigFile == null) {
122+
if (yamlConfigFiles.isEmpty()) {
91123
// try to load from the default conf dir
92124
if (defaultConfigDir == null) {
93125
throw new IllegalArgumentException(
@@ -102,11 +134,41 @@ public static Configuration loadConfiguration(
102134
+ confDirFile.getAbsolutePath()
103135
+ ") does not describe an existing directory.");
104136
}
105-
// get Fluss yaml configuration file from dir
106-
yamlConfigFile = new File(confDirFile, FLUSS_CONF_FILENAME);
137+
138+
// get Fluss yaml configuration files from dir
139+
final File serverYamlFile = new File(confDirFile, FLUSS_CONF_FILENAME[0]);
140+
final File commonYamlFile = new File(confDirFile, FLUSS_CONF_FILENAME[1]);
141+
142+
// 1. check if old and new configuration files are mixed which is not supported
143+
if (serverYamlFile.exists() && commonYamlFile.exists()) {
144+
throw new IllegalConfigurationException(
145+
"Only one of "
146+
+ FLUSS_CONF_FILENAME[0]
147+
+ " and "
148+
+ FLUSS_CONF_FILENAME[1]
149+
+ " may be specified.");
150+
}
151+
152+
// 2. backward compatability, use server.yaml
153+
if (serverYamlFile.exists()) {
154+
yamlConfigFiles.add(new File(confDirFile, FLUSS_CONF_FILENAME[0]));
155+
}
156+
157+
// 3. latest configuration setup: load common.yaml and additionally specified, dedicated
158+
// configuration files
159+
if (commonYamlFile.exists()) {
160+
yamlConfigFiles.add(new File(confDirFile, FLUSS_CONF_FILENAME[1]));
161+
162+
for (String additionalDefaultFile : additionalDefaultFiles) {
163+
yamlConfigFiles.add(new File(confDirFile, additionalDefaultFile));
164+
}
165+
}
107166
}
108167

109-
Configuration configuration = loadYAMLResource(yamlConfigFile);
168+
Configuration configuration = loadYAMLResource(yamlConfigFiles.remove(0));
169+
for (File yamlConfigFile : yamlConfigFiles) {
170+
configuration.addAll(loadYAMLResource(yamlConfigFile));
171+
}
110172

111173
logConfiguration("Loading", configuration);
112174

fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.server;
1818

19+
import com.alibaba.fluss.cluster.ServerType;
1920
import com.alibaba.fluss.config.ConfigOptions;
2021
import com.alibaba.fluss.config.Configuration;
2122
import com.alibaba.fluss.exception.FlussException;
@@ -71,9 +72,10 @@ protected ServerBase(Configuration conf) {
7172

7273
private Thread shutDownHook;
7374

74-
protected static Configuration loadConfiguration(String[] args, String serverClassName) {
75+
protected static Configuration loadConfiguration(
76+
String[] args, String serverClassName, ServerType serverType) {
7577
try {
76-
return ConfigurationParserUtils.loadCommonConfiguration(args, serverClassName);
78+
return ConfigurationParserUtils.loadConfiguration(args, serverClassName, serverType);
7779
} catch (FlussParseException fpe) {
7880
LOG.error("Could not load the configuration.", fpe);
7981
System.exit(FAILURE_EXIT_CODE);

fluss-server/src/main/java/com/alibaba/fluss/server/cli/CommandLineOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class CommandLineOptions {
3333
.required(true)
3434
.hasArg(true)
3535
.argName("configuration directory")
36-
.desc("Directory which contains the configuration file server.yaml.")
36+
.desc("Directory which contains the YAML configuration files.")
3737
.build();
3838

3939
public static final Option DYNAMIC_PROPERTY_OPTION =

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class CoordinatorServer extends ServerBase {
7070

7171
public static final String DEFAULT_DATABASE = "fluss";
7272
private static final String SERVER_NAME = "CoordinatorServer";
73+
private static final ServerType SERVER_TYPE = ServerType.COORDINATOR;
7374

7475
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);
7576

@@ -127,7 +128,7 @@ public CoordinatorServer(Configuration conf) {
127128

128129
public static void main(String[] args) {
129130
Configuration configuration =
130-
loadConfiguration(args, CoordinatorServer.class.getSimpleName());
131+
loadConfiguration(args, CoordinatorServer.class.getSimpleName(), SERVER_TYPE);
131132
CoordinatorServer coordinatorServer = new CoordinatorServer(configuration);
132133
startServer(coordinatorServer);
133134
}
@@ -136,7 +137,7 @@ public static void main(String[] args) {
136137
protected void startServices() throws Exception {
137138
synchronized (lock) {
138139
LOG.info("Initializing Coordinator services.");
139-
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.COORDINATOR);
140+
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, SERVER_TYPE);
140141
this.serverId = UUID.randomUUID().toString();
141142

142143
// for metrics

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
public class TabletServer extends ServerBase {
7373

7474
private static final String SERVER_NAME = "TabletServer";
75+
private static final ServerType SERVER_TYPE = ServerType.TABLET_SERVER;
7576

7677
private static final Logger LOG = LoggerFactory.getLogger(TabletServer.class);
7778

@@ -136,7 +137,8 @@ public TabletServer(Configuration conf) {
136137
}
137138

138139
public static void main(String[] args) {
139-
Configuration configuration = loadConfiguration(args, TabletServer.class.getSimpleName());
140+
Configuration configuration =
141+
loadConfiguration(args, TabletServer.class.getSimpleName(), SERVER_TYPE);
140142
TabletServer tabletServer = new TabletServer(configuration);
141143
startServer(tabletServer);
142144
}
@@ -146,7 +148,7 @@ protected void startServices() throws Exception {
146148
synchronized (lock) {
147149
LOG.info("Initializing Tablet services.");
148150

149-
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.TABLET_SERVER);
151+
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, SERVER_TYPE);
150152

151153
// for metrics
152154
this.metricRegistry = MetricRegistry.create(conf, pluginManager);

fluss-server/src/main/java/com/alibaba/fluss/server/utils/ConfigurationParserUtils.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.server.utils;
1818

19+
import com.alibaba.fluss.cluster.ServerType;
1920
import com.alibaba.fluss.config.Configuration;
2021
import com.alibaba.fluss.config.ConfigurationUtils;
2122
import com.alibaba.fluss.config.GlobalConfiguration;
@@ -27,23 +28,29 @@
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

31+
import java.util.Collections;
32+
3033
/**
3134
* Utility class to extract related parameters from {@link Configuration} and to sanity check them.
3235
*/
3336
public class ConfigurationParserUtils {
3437

3538
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationParserUtils.class);
3639

40+
private static final String COORDINATOR_SERVER_CONF_FILE = "coordinator-server.yaml";
41+
private static final String TABLET_SERVER_CONF_FILE = "tablet-server.yaml";
42+
3743
/**
3844
* Generate configuration from only the config file and dynamic properties.
3945
*
4046
* @param args the commandline arguments
4147
* @param cmdLineSyntax the syntax for this application
48+
* @param serverType the server type
4249
* @return generated configuration
4350
* @throws FlussParseException if the configuration cannot be generated
4451
*/
45-
public static Configuration loadCommonConfiguration(String[] args, String cmdLineSyntax)
46-
throws FlussParseException {
52+
public static Configuration loadConfiguration(
53+
String[] args, String cmdLineSyntax, ServerType serverType) throws FlussParseException {
4754
final CommandLineParser<ServerConfiguration> commandLineParser =
4855
new CommandLineParser<>(new ServerConfigurationParserFactory());
4956

@@ -59,7 +66,12 @@ public static Configuration loadCommonConfiguration(String[] args, String cmdLin
5966

6067
final Configuration dynamicProperties =
6168
ConfigurationUtils.createConfiguration(serverConfiguration.getDynamicProperties());
69+
6270
return GlobalConfiguration.loadConfiguration(
63-
serverConfiguration.getConfigDir(), dynamicProperties);
71+
serverConfiguration.getConfigDir(),
72+
serverType == ServerType.COORDINATOR
73+
? Collections.singletonList(COORDINATOR_SERVER_CONF_FILE)
74+
: Collections.singletonList(TABLET_SERVER_CONF_FILE),
75+
dynamicProperties);
6476
}
6577
}

fluss-server/src/test/java/com/alibaba/fluss/server/utils/ConfigurationParserUtilsTest.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616

1717
package com.alibaba.fluss.server.utils;
1818

19+
import com.alibaba.fluss.cluster.ServerType;
1920
import com.alibaba.fluss.config.ConfigBuilder;
2021
import com.alibaba.fluss.config.ConfigOptions;
2122
import com.alibaba.fluss.config.Configuration;
2223
import com.alibaba.fluss.config.GlobalConfiguration;
2324
import com.alibaba.fluss.server.exception.FlussParseException;
2425

2526
import org.apache.commons.cli.MissingOptionException;
26-
import org.junit.jupiter.api.Test;
2727
import org.junit.jupiter.api.io.TempDir;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.EnumSource;
2830

2931
import java.nio.file.Files;
3032
import java.nio.file.Path;
@@ -36,8 +38,9 @@
3638
/** Test for {@link ConfigurationParserUtils}. */
3739
public class ConfigurationParserUtilsTest {
3840

39-
@Test
40-
void testLoadCommonConfiguration(@TempDir Path tempFolder) throws Exception {
41+
@ParameterizedTest
42+
@EnumSource(ServerType.class)
43+
void testLoadConfiguration(ServerType serverType, @TempDir Path tempFolder) throws Exception {
4144
Path yamlFile = tempFolder.resolve("server.yaml");
4245
Files.write(yamlFile, Collections.singleton("coordinator.port: 9124"));
4346
String confDir = tempFolder.toAbsolutePath().toString();
@@ -51,8 +54,8 @@ void testLoadCommonConfiguration(@TempDir Path tempFolder) throws Exception {
5154
};
5255

5356
Configuration configuration =
54-
ConfigurationParserUtils.loadCommonConfiguration(
55-
args, ConfigurationParserUtilsTest.class.getSimpleName());
57+
ConfigurationParserUtils.loadConfiguration(
58+
args, ConfigurationParserUtilsTest.class.getSimpleName(), serverType);
5659
// should respect the configurations in args
5760
assertThat(configuration.getString(ConfigBuilder.key(key).stringType().noDefaultValue()))
5861
.isEqualTo(value);
@@ -61,39 +64,43 @@ void testLoadCommonConfiguration(@TempDir Path tempFolder) throws Exception {
6164
assertThat(configuration.getString(ConfigOptions.COORDINATOR_PORT)).isEqualTo("9124");
6265
}
6366

64-
@Test
65-
void testLoadWithUserSpecifiedConfigFile(@TempDir Path tempFolder) throws Exception {
67+
@ParameterizedTest
68+
@EnumSource(ServerType.class)
69+
void testLoadWithUserSpecifiedConfigFile(ServerType serverType, @TempDir Path tempFolder)
70+
throws Exception {
6671
Path yamlFile = tempFolder.resolve("server.yaml");
6772
Files.write(yamlFile, Collections.singleton("coordinator.port: 9124"));
6873
String confDir = tempFolder.toAbsolutePath().toString();
6974

7075
Path userDefinedConfigFile = tempFolder.resolve("user-defined-server.yaml");
7176
Files.write(yamlFile, Collections.singleton("coordinator.port: 1000"));
7277

73-
final String configKey = GlobalConfiguration.FLUSS_CONF_FILENAME;
78+
final String configKey = GlobalConfiguration.FLUSS_CONF_FILENAME[0];
7479
final String configValue = userDefinedConfigFile.toString();
7580

7681
final String[] args = {
7782
"--configDir", confDir, String.format("-D%s=%s", configKey, configValue)
7883
};
7984
Configuration configuration =
80-
ConfigurationParserUtils.loadCommonConfiguration(
81-
args, ConfigurationParserUtilsTest.class.getSimpleName());
85+
ConfigurationParserUtils.loadConfiguration(
86+
args, ConfigurationParserUtilsTest.class.getSimpleName(), serverType);
8287
// should use the configurations in the user-defined-server.yaml
8388
assertThat(
8489
configuration.get(
8590
ConfigBuilder.key("coordinator.port").intType().noDefaultValue()))
8691
.isEqualTo(1000);
8792
}
8893

89-
@Test
90-
void testLoadCommonConfigurationThrowException() {
94+
@ParameterizedTest
95+
@EnumSource(ServerType.class)
96+
void testLoadConfigurationThrowException(ServerType serverType) {
9197
// should throw exception when miss options 'c'('configDir')
9298
assertThatThrownBy(
9399
() ->
94-
ConfigurationParserUtils.loadCommonConfiguration(
100+
ConfigurationParserUtils.loadConfiguration(
95101
new String[0],
96-
ConfigurationParserUtilsTest.class.getSimpleName()))
102+
ConfigurationParserUtilsTest.class.getSimpleName(),
103+
serverType))
97104
.isInstanceOf(FlussParseException.class)
98105
.hasMessageContaining("Failed to parse the command line arguments")
99106
.cause()

0 commit comments

Comments
 (0)