Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate partition decorator syntax feature #21

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -51,11 +54,22 @@
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for connector and task configs; contains properties shared between the two of them.
*/
public class BigQuerySinkConfig extends AbstractConfig {

private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);

public static final String DEPRECATED_DOC = "(DEPRECATED)";
public static final String GCS_LOAD_DEPRECATION_NOTICE =
"GCS batch loading has been deprecated and will be removed in a future major release.";
public static final String DECORATOR_SYNTAX_DEPRECATION_NOTICE =
"Use of partition decorator syntax has been deprecated and will be removed in a future release.";

// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
public static final String TOPICS_DEFAULT = "";
Expand Down Expand Up @@ -172,7 +186,7 @@ public class BigQuerySinkConfig extends AbstractConfig {
private static final List<String> ENABLE_BATCH_DEFAULT = Collections.emptyList();
private static final ConfigDef.Importance ENABLE_BATCH_IMPORTANCE = ConfigDef.Importance.LOW;
private static final String ENABLE_BATCH_DOC =
"Beta Feature; use with caution: The sublist of topics to be batch loaded through GCS";
"The sublist of topics to be batch loaded through GCS.";
private static final ConfigDef.Type BATCH_LOAD_INTERVAL_SEC_TYPE = ConfigDef.Type.INT;
private static final Integer BATCH_LOAD_INTERVAL_SEC_DEFAULT = 120;
private static final ConfigDef.Importance BATCH_LOAD_INTERVAL_SEC_IMPORTANCE =
Expand Down Expand Up @@ -535,12 +549,31 @@ public class BigQuerySinkConfig extends AbstractConfig {

protected BigQuerySinkConfig(ConfigDef config, Map<String, String> properties) {
super(config, properties);
logDeprecationWarnings();
}

public BigQuerySinkConfig(Map<String, String> properties) {
this(getConfig(), properties);
}

private void logDeprecationWarnings() {
if (!getList(ENABLE_BATCH_CONFIG).isEmpty()) {
logger.warn(
GCS_LOAD_DEPRECATION_NOTICE
+ " To disable this feature, remove the {} property from the connector configuration",
ENABLE_BATCH_CONFIG
);
}

if (getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)) {
logger.warn(
DECORATOR_SYNTAX_DEPRECATION_NOTICE
+ " To disable this feature, set the {} property to false in the connector configuration",
BIGQUERY_PARTITION_DECORATOR_CONFIG
);
}
}

/**
* Return the ConfigDef object used to define this config's fields.
*
Expand Down Expand Up @@ -573,25 +606,25 @@ public static ConfigDef getConfig() {
ENABLE_BATCH_TYPE,
ENABLE_BATCH_DEFAULT,
ENABLE_BATCH_IMPORTANCE,
ENABLE_BATCH_DOC
deprecatedGcsLoadDoc(ENABLE_BATCH_DOC)
).define(
BATCH_LOAD_INTERVAL_SEC_CONFIG,
BATCH_LOAD_INTERVAL_SEC_TYPE,
BATCH_LOAD_INTERVAL_SEC_DEFAULT,
BATCH_LOAD_INTERVAL_SEC_IMPORTANCE,
BATCH_LOAD_INTERVAL_SEC_DOC
deprecatedGcsLoadDoc(BATCH_LOAD_INTERVAL_SEC_DOC)
).define(
GCS_BUCKET_NAME_CONFIG,
GCS_BUCKET_NAME_TYPE,
GCS_BUCKET_NAME_DEFAULT,
GCS_BUCKET_NAME_IMPORTANCE,
GCS_BUCKET_NAME_DOC
deprecatedGcsLoadDoc(GCS_BUCKET_NAME_DOC)
).define(
GCS_FOLDER_NAME_CONFIG,
GCS_FOLDER_NAME_TYPE,
GCS_FOLDER_NAME_DEFAULT,
GCS_FOLDER_NAME_IMPORTANCE,
GCS_FOLDER_NAME_DOC
deprecatedGcsLoadDoc(GCS_FOLDER_NAME_DOC)
).define(
PROJECT_CONFIG,
PROJECT_TYPE,
Expand Down Expand Up @@ -685,7 +718,7 @@ public static ConfigDef getConfig() {
AUTO_CREATE_BUCKET_TYPE,
AUTO_CREATE_BUCKET_DEFAULT,
AUTO_CREATE_BUCKET_IMPORTANCE,
AUTO_CREATE_BUCKET_DOC
deprecatedGcsLoadDoc(AUTO_CREATE_BUCKET_DOC)
).define(
ALLOW_NEW_BIGQUERY_FIELDS_CONFIG,
ALLOW_NEW_BIGQUERY_FIELDS_TYPE,
Expand Down Expand Up @@ -770,13 +803,13 @@ public static ConfigDef getConfig() {
BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG_TYPE,
BIGQUERY_MESSAGE_TIME_PARTITIONING_DEFAULT,
BIGQUERY_MESSAGE_TIME_PARTITIONING_IMPORTANCE,
BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC
deprecatedPartitionSyntaxDoc(BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC)
).define(
BIGQUERY_PARTITION_DECORATOR_CONFIG,
BIGQUERY_PARTITION_DECORATOR_CONFIG_TYPE,
BIGQUERY_PARTITION_DECORATOR_DEFAULT,
BIGQUERY_PARTITION_DECORATOR_IMPORTANCE,
BIGQUERY_PARTITION_DECORATOR_DOC
deprecatedPartitionSyntaxDoc(BIGQUERY_PARTITION_DECORATOR_DOC)
).define(
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG,
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE,
Expand Down Expand Up @@ -867,18 +900,6 @@ public boolean visible(String s, Map<String, Object> map) {
);
}

public static boolean upsertDeleteEnabled(Map<String, String> props) {
String upsertStr = props.get(UPSERT_ENABLED_CONFIG);
String deleteStr = props.get(DELETE_ENABLED_CONFIG);
return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr)
|| Boolean.TRUE.toString().equalsIgnoreCase(deleteStr);
}

public static boolean gcsBatchLoadingEnabled(Map<String, String> props) {
String batchLoadStr = props.get(ENABLE_BATCH_CONFIG);
return batchLoadStr != null && !batchLoadStr.isEmpty();
}

/**
* Used in conjunction with {@link com.wepay.kafka.connect.bigquery.BigQuerySinkConnector#validate(Map)} to perform
* preflight configuration checks. Simple validations that only require a single property value at a time (such as
Expand Down Expand Up @@ -924,13 +945,6 @@ public GcpClientBuilder.KeySource getKeySource() {
}
}

/**
* Returns the keyfile
*/
public String getKeyFile() {
return Optional.ofNullable(getPassword(KEYFILE_CONFIG)).map(Password::value).orElse(null);
}

/**
* Return a new instance of the configured Schema Converter.
*
Expand Down Expand Up @@ -1122,4 +1136,16 @@ private static String header(String text) {
return wrapper + "\n" + text + "\n" + wrapper + "\n";
}

private static String deprecatedGcsLoadDoc(String doc) {
return deprecatedDoc(doc, GCS_LOAD_DEPRECATION_NOTICE);
}

public static String deprecatedPartitionSyntaxDoc(String doc) {
return deprecatedDoc(doc, DECORATOR_SYNTAX_DEPRECATION_NOTICE);
}

private static String deprecatedDoc(String doc, String notice) {
return DEPRECATED_DOC + " " + doc + " Warning: " + notice;
}

}
Loading