Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class ConnectorCredentialsProvider implements CredentialsProvider {
private static final List<String> GCP_SCOPE =
Expand All @@ -38,17 +39,19 @@ private ConnectorCredentialsProvider(CredentialsProvider impl) {
public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
String credentialsClass = config.get(ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG).toString();
long setOptsCount = Stream.of(credentialsPath, credentialsJson, credentialsClass).filter(s -> !s.isEmpty()).count();

if (setOptsCount > 1) {
throw new IllegalArgumentException("More than one of the credentials config are set");
}

if (!credentialsPath.isEmpty()) {
if (!credentialsJson.isEmpty()) {
throw new IllegalArgumentException(
"May not set both "
+ ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG
+ " and "
+ ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
}
return ConnectorCredentialsProvider.fromFile(credentialsPath);
} else if (!credentialsJson.isEmpty()) {
return ConnectorCredentialsProvider.fromJson(credentialsJson);
} else if (!credentialsClass.isEmpty()) {
return ConnectorCredentialsProvider.fromClass(credentialsClass);
} else {
return ConnectorCredentialsProvider.fromDefault();
}
Expand All @@ -68,6 +71,21 @@ public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
.createScoped(GCP_SCOPE));
}

public static ConnectorCredentialsProvider fromClass(String credentialsClass) {
try {
final Class<?> klass = Class.forName(credentialsClass);
final Object obj = klass.getDeclaredConstructor().newInstance();

if (!obj instanceof CredentialsProvider) {
throw new IllegalArgumentException(String.format("Supplied class %s is not a CredentialsProvider", credentialsClass));
}

return new ConnectorCredentialsProvider(() -> ((CredentialsProvider) obj).getCredentials());
} catch (Exception e) {
throw new RuntimeException("Error loading class: " + e);
}
}

public static ConnectorCredentialsProvider fromDefault() {
return new ConnectorCredentialsProvider(
() -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ConnectorUtils {
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
public static final String GCP_CREDENTIALS_JSON_CONFIG = "gcp.credentials.json";
public static final String GCP_CREDENTIALS_CLASS_CONFIG = "gcp.credentials.class";
public static final String KAFKA_MESSAGE_CPS_BODY_FIELD = "message";
public static final String KAFKA_TOPIC_ATTRIBUTE = "kafka.topic";
public static final String KAFKA_PARTITION_ATTRIBUTE = "kafka.partition";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ public ConfigDef config() {
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider")
.define(
ORDERING_KEY_SOURCE,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ public ConfigDef config() {
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider")
.define(
USE_KAFKA_HEADERS,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ static ConfigDef config() {
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider");
}
}