Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Copyright Pravega Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.flink.config;

import io.pravega.client.ClientConfig;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

/**
* Details about each configuration could be found at {@link ClientConfig}.
*/
public final class PravegaClientConfig {
public static final String CLIENT_PREFIX = "pravega.";
public static final String CLIENT_SECURITY_PREFIX = "security.";

public static final ConfigOption<String> DEFAULT_SCOPE =
ConfigOptions.key(CLIENT_PREFIX + "defaultScope")
.stringType()
.noDefaultValue()
.withDescription("Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups.");
public static final ConfigOption<String> CONTROLLER_URI =
ConfigOptions.key(CLIENT_PREFIX + "controllerURI")
.stringType()
.noDefaultValue()
.withDescription(Description.builder()
.text("Configures the Pravega controller RPC URI.")
.linebreak()
.text("This can be of 2 types:")
.linebreak()
.text("1. tcp://ip1:port1,ip2:port2,...")
.linebreak()
.text("This is used if the controller endpoints are static and can be directly accessed.")
.linebreak()
.text("2. pravega://ip1:port1,ip2:port2,...")
.linebreak()
.text("This is used to autodiscovery the controller endpoints from an initial controller list.")
.build());
public static final ConfigOption<String> USERNAME =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "username")
.stringType()
.noDefaultValue()
.withDescription("Username passed to Pravega for authentication and authorizing the access.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "password")
.stringType()
.noDefaultValue()
.withDescription("Password passed to Pravega for authentication and authorizing the access.");
public static final ConfigOption<String> TRUST_STORE =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "trustStore")
.stringType()
.noDefaultValue()
.withDescription(Description.builder()
.text("Path to an optional truststore. If this is null or empty, the default JVM trust store is used.")
.linebreak()
.text("This is currently expected to be a signing certificate for the certification authority.")
.build()
);
public static final ConfigOption<Boolean> VALIDATE_HOST_NAME =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "validateHostName")
.booleanType()
.noDefaultValue()
.withDescription("Flag to decide whether to validate the hostname on incoming requests.");
public static final ConfigOption<Integer> MAX_CONNECTION_PER_SEGMENT_STORE =
ConfigOptions.key(CLIENT_PREFIX + "maxConnectionsPerSegmentStore")
.intType()
.noDefaultValue()
.withDescription("Maximum number of connections per Segment store to be used by connection pooling.");
public static final ConfigOption<Boolean> ENABLE_TLS_TO_CONTROLLER =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToController")
.booleanType()
.noDefaultValue()
.withDescription("Flag to decide whether to enable TLS for client's communication with the Controller.");
public static final ConfigOption<Boolean> ENABLE_TLS_TO_SEGMENT_STORE =
ConfigOptions.key(CLIENT_PREFIX + CLIENT_SECURITY_PREFIX + "enableTlsToSegmentStore")
.booleanType()
.noDefaultValue()
.withDescription("Flag to decide whether to enable TLS for client's communication with the Controller.");

private PravegaClientConfig() {
// This is a constant class.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright Pravega Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.flink.config;

import io.pravega.client.ClientConfig;
import io.pravega.shared.security.auth.DefaultCredentials;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;

import javax.annotation.Nullable;
import java.net.URI;
import java.util.Map;
import java.util.Properties;

import static io.pravega.connectors.flink.util.FlinkPravegaUtils.isCredentialsLoadDynamic;

/**
* Helper methods for {@link PravegaClientConfig}.
*/
public final class PravegaClientConfigUtils {
/**
* A builder for building the Pravega {@link ClientConfig} instance.
*
* @param pravegaClientConfig The configuration.
* @return A Pravega {@link ClientConfig} instance.
*/
public static ClientConfig buildClientConfigFromProperties(Configuration pravegaClientConfig) {
ClientConfig.ClientConfigBuilder builder = ClientConfig.builder();
builder.controllerURI(URI.create(pravegaClientConfig.get(PravegaClientConfig.CONTROLLER_URI)));
if (!isCredentialsLoadDynamic() &&
pravegaClientConfig.getOptional(PravegaClientConfig.USERNAME).isPresent() &&
pravegaClientConfig.getOptional(PravegaClientConfig.PASSWORD).isPresent()) {
builder.credentials(new DefaultCredentials(
pravegaClientConfig.get(PravegaClientConfig.PASSWORD),
pravegaClientConfig.get(PravegaClientConfig.USERNAME))
);
}
pravegaClientConfig.getOptional(PravegaClientConfig.VALIDATE_HOST_NAME).ifPresent(builder::validateHostName);
pravegaClientConfig.getOptional(PravegaClientConfig.TRUST_STORE).ifPresent(builder::trustStore);
pravegaClientConfig.getOptional(PravegaClientConfig.MAX_CONNECTION_PER_SEGMENT_STORE).ifPresent(builder::maxConnectionsPerSegmentStore);
pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_CONTROLLER).ifPresent(builder::enableTlsToController);
pravegaClientConfig.getOptional(PravegaClientConfig.ENABLE_TLS_TO_SEGMENT_STORE).ifPresent(builder::enableTlsToSegmentStore);
return builder.build();
}

/**
* Get configuration from command line and system environment.
*
* @param params Command line params from {@link ParameterTool#fromArgs(String[])}
* @return A Pravega client configuration.
*/
public static Configuration getConfigFromEnvironmentAndCommand(@Nullable ParameterTool params) {
Configuration pravegaClientConfig = new Configuration();

Properties properties = System.getProperties();
Map<String, String> env = System.getenv();

if (params != null && params.has("controller")) {
pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, params.get("controller"));
}
if (properties != null && properties.containsKey("pravega.controller.uri")) {
pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, properties.getProperty("pravega.controller.uri"));
}
if (env != null && env.containsKey("PRAVEGA_CONTROLLER_URI")) {
pravegaClientConfig.set(PravegaClientConfig.CONTROLLER_URI, env.get("PRAVEGA_CONTROLLER_URI"));
}

if (params != null && params.has("scope")) {
pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, params.get("scope"));
}
if (properties != null && properties.containsKey("pravega.scope")) {
pravegaClientConfig.set(PravegaClientConfig.DEFAULT_SCOPE, properties.getProperty("pravega.scope"));
}
if (env != null && env.containsKey("PRAVEGA_SCOPE")) {
pravegaClientConfig.set( PravegaClientConfig.DEFAULT_SCOPE, env.get("PRAVEGA_CONTROLLER_URI"));
}

return pravegaClientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.pravega.connectors.flink.source.split.PravegaSplitSerializer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand All @@ -45,6 +44,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -85,10 +85,10 @@ public class PravegaSource<T>
final DeserializationSchema<T> deserializationSchema;

// the timeout for reading events from Pravega
final Time eventReadTimeout;
final Duration eventReadTimeout;

// the timeout for call that initiates the Pravega checkpoint
final Time checkpointInitiateTimeout;
final Duration checkpointInitiateTimeout;

// flag to enable/disable metrics
final boolean enableMetrics;
Expand All @@ -110,7 +110,7 @@ public class PravegaSource<T>
public PravegaSource(ClientConfig clientConfig,
ReaderGroupConfig readerGroupConfig, String scope, String readerGroupName,
DeserializationSchema<T> deserializationSchema,
Time eventReadTimeout, Time checkpointInitiateTimeout,
Duration eventReadTimeout, Duration checkpointInitiateTimeout,
boolean enableMetrics) {
this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig");
this.readerGroupConfig = Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig");
Expand All @@ -131,8 +131,8 @@ public Boundedness getBoundedness() {
public SourceReader<T, PravegaSplit> createReader(SourceReaderContext readerContext) {
Supplier<PravegaSplitReader> splitReaderSupplier =
() ->
new PravegaSplitReader(scope, clientConfig,
readerGroupName, readerContext.getIndexOfSubtask());
new PravegaSplitReader(scope, clientConfig, readerGroupName,
readerContext.getIndexOfSubtask(), this.eventReadTimeout);

return new PravegaSourceReader<>(
splitReaderSupplier,
Expand All @@ -150,7 +150,8 @@ public SplitEnumerator<PravegaSplit, Checkpoint> createEnumerator(
this.readerGroupName,
this.clientConfig,
this.readerGroupConfig,
null);
null,
this.checkpointInitiateTimeout);
}

@Override
Expand All @@ -163,7 +164,8 @@ public SplitEnumerator<PravegaSplit, Checkpoint> restoreEnumerator(
this.readerGroupName,
this.clientConfig,
this.readerGroupConfig,
checkpoint);
checkpoint,
this.checkpointInitiateTimeout);

}

Expand Down
Loading