Skip to content

Commit fbfc6f2

Browse files
committed
Deprecate GCS batch loading feature
1 parent 2b87f11 commit fbfc6f2

File tree

1 file changed

+33
-25
lines changed

1 file changed

+33
-25
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

+33-25
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
import java.lang.reflect.Constructor;
3636
import java.lang.reflect.InvocationTargetException;
3737
import java.util.ArrayList;
38+
import java.util.Arrays;
3839
import java.util.Collections;
3940
import java.util.HashMap;
41+
import java.util.HashSet;
4042
import java.util.List;
4143
import java.util.Map;
4244
import java.util.Optional;
45+
import java.util.Set;
4346
import java.util.function.Function;
4447
import java.util.stream.Collectors;
4548
import java.util.stream.Stream;
@@ -51,11 +54,20 @@
5154
import org.apache.kafka.common.config.types.Password;
5255
import org.apache.kafka.connect.errors.ConnectException;
5356
import org.apache.kafka.connect.sink.SinkConnector;
57+
import org.slf4j.Logger;
58+
import org.slf4j.LoggerFactory;
5459

5560
/**
5661
* Base class for connector and task configs; contains properties shared between the two of them.
5762
*/
5863
public class BigQuerySinkConfig extends AbstractConfig {
64+
65+
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);
66+
67+
public static final String DEPRECATED_DOC = "(DEPRECATED)";
68+
public static final String GCS_LOAD_DEPRECATION_NOTICE =
69+
"GCS batch loading has been deprecated and will be removed in a future major release.";
70+
5971
// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
6072
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
6173
public static final String TOPICS_DEFAULT = "";
@@ -172,7 +184,7 @@ public class BigQuerySinkConfig extends AbstractConfig {
172184
private static final List<String> ENABLE_BATCH_DEFAULT = Collections.emptyList();
173185
private static final ConfigDef.Importance ENABLE_BATCH_IMPORTANCE = ConfigDef.Importance.LOW;
174186
private static final String ENABLE_BATCH_DOC =
175-
"Beta Feature; use with caution: The sublist of topics to be batch loaded through GCS";
187+
"The sublist of topics to be batch loaded through GCS.";
176188
private static final ConfigDef.Type BATCH_LOAD_INTERVAL_SEC_TYPE = ConfigDef.Type.INT;
177189
private static final Integer BATCH_LOAD_INTERVAL_SEC_DEFAULT = 120;
178190
private static final ConfigDef.Importance BATCH_LOAD_INTERVAL_SEC_IMPORTANCE =
@@ -535,12 +547,23 @@ public class BigQuerySinkConfig extends AbstractConfig {
535547

536548
protected BigQuerySinkConfig(ConfigDef config, Map<String, String> properties) {
537549
super(config, properties);
550+
logDeprecationWarnings();
538551
}
539552

540553
public BigQuerySinkConfig(Map<String, String> properties) {
541554
this(getConfig(), properties);
542555
}
543556

557+
private void logDeprecationWarnings() {
558+
if (!getList(ENABLE_BATCH_CONFIG).isEmpty()) {
559+
logger.warn(
560+
GCS_LOAD_DEPRECATION_NOTICE
561+
+ " To disable this feature, remove the {} property from the connector configuration",
562+
ENABLE_BATCH_CONFIG
563+
);
564+
}
565+
}
566+
544567
/**
545568
* Return the ConfigDef object used to define this config's fields.
546569
*
@@ -573,25 +596,25 @@ public static ConfigDef getConfig() {
573596
ENABLE_BATCH_TYPE,
574597
ENABLE_BATCH_DEFAULT,
575598
ENABLE_BATCH_IMPORTANCE,
576-
ENABLE_BATCH_DOC
599+
deprecatedGcsLoadDoc(ENABLE_BATCH_DOC)
577600
).define(
578601
BATCH_LOAD_INTERVAL_SEC_CONFIG,
579602
BATCH_LOAD_INTERVAL_SEC_TYPE,
580603
BATCH_LOAD_INTERVAL_SEC_DEFAULT,
581604
BATCH_LOAD_INTERVAL_SEC_IMPORTANCE,
582-
BATCH_LOAD_INTERVAL_SEC_DOC
605+
deprecatedGcsLoadDoc(BATCH_LOAD_INTERVAL_SEC_DOC)
583606
).define(
584607
GCS_BUCKET_NAME_CONFIG,
585608
GCS_BUCKET_NAME_TYPE,
586609
GCS_BUCKET_NAME_DEFAULT,
587610
GCS_BUCKET_NAME_IMPORTANCE,
588-
GCS_BUCKET_NAME_DOC
611+
deprecatedGcsLoadDoc(GCS_BUCKET_NAME_DOC)
589612
).define(
590613
GCS_FOLDER_NAME_CONFIG,
591614
GCS_FOLDER_NAME_TYPE,
592615
GCS_FOLDER_NAME_DEFAULT,
593616
GCS_FOLDER_NAME_IMPORTANCE,
594-
GCS_FOLDER_NAME_DOC
617+
deprecatedGcsLoadDoc(GCS_FOLDER_NAME_DOC)
595618
).define(
596619
PROJECT_CONFIG,
597620
PROJECT_TYPE,
@@ -685,7 +708,7 @@ public static ConfigDef getConfig() {
685708
AUTO_CREATE_BUCKET_TYPE,
686709
AUTO_CREATE_BUCKET_DEFAULT,
687710
AUTO_CREATE_BUCKET_IMPORTANCE,
688-
AUTO_CREATE_BUCKET_DOC
711+
deprecatedGcsLoadDoc(AUTO_CREATE_BUCKET_DOC)
689712
).define(
690713
ALLOW_NEW_BIGQUERY_FIELDS_CONFIG,
691714
ALLOW_NEW_BIGQUERY_FIELDS_TYPE,
@@ -867,18 +890,6 @@ public boolean visible(String s, Map<String, Object> map) {
867890
);
868891
}
869892

870-
public static boolean upsertDeleteEnabled(Map<String, String> props) {
871-
String upsertStr = props.get(UPSERT_ENABLED_CONFIG);
872-
String deleteStr = props.get(DELETE_ENABLED_CONFIG);
873-
return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr)
874-
|| Boolean.TRUE.toString().equalsIgnoreCase(deleteStr);
875-
}
876-
877-
public static boolean gcsBatchLoadingEnabled(Map<String, String> props) {
878-
String batchLoadStr = props.get(ENABLE_BATCH_CONFIG);
879-
return batchLoadStr != null && !batchLoadStr.isEmpty();
880-
}
881-
882893
/**
883894
* Used in conjunction with {@link com.wepay.kafka.connect.bigquery.BigQuerySinkConnector#validate(Map)} to perform
884895
* preflight configuration checks. Simple validations that only require a single property value at a time (such as
@@ -924,13 +935,6 @@ public GcpClientBuilder.KeySource getKeySource() {
924935
}
925936
}
926937

927-
/**
928-
* Returns the keyfile
929-
*/
930-
public String getKeyFile() {
931-
return Optional.ofNullable(getPassword(KEYFILE_CONFIG)).map(Password::value).orElse(null);
932-
}
933-
934938
/**
935939
* Return a new instance of the configured Schema Converter.
936940
*
@@ -1122,4 +1126,8 @@ private static String header(String text) {
11221126
return wrapper + "\n" + text + "\n" + wrapper + "\n";
11231127
}
11241128

1129+
private static String deprecatedGcsLoadDoc(String doc) {
1130+
return DEPRECATED_DOC + " " + doc + " Warning: " + GCS_LOAD_DEPRECATION_NOTICE;
1131+
}
1132+
11251133
}

0 commit comments

Comments
 (0)