diff --git a/build.gradle b/build.gradle index 7234dfe36..18af188af 100644 --- a/build.gradle +++ b/build.gradle @@ -3,10 +3,15 @@ apply from: file('gradle/environment.gradle') apply from: file("gradle/dependency-versions.gradle") apply from: file("gradle/install-git-hooks.gradle") +apply plugin: 'com.palantir.docker' + buildscript { repositories { mavenCentral() } + dependencies { + classpath "com.palantir.docker:com.palantir.docker.gradle.plugin:0.26.0" + } apply from: file('gradle/buildscript.gradle'), to: buildscript } @@ -18,7 +23,6 @@ allprojects { apply plugin: 'project-report' apply plugin: 'checkstyle' apply plugin: 'findbugs' - repositories { mavenCentral() jcenter() @@ -44,6 +48,14 @@ idea { } } +docker { + name 'brooklin' + tags 'latest' // deprecated, use 'tag' + dockerfile file('docker/Dockerfile') + files file("${project.rootDir}/datastream-tools/build/distributions/${rootProject.name}-${rootProject.version}.tgz") + buildArgs([VERSION: rootProject.version]) +} + subprojects { apply plugin: 'java' apply plugin: 'pegasus' @@ -395,6 +407,7 @@ project(':datastream-tools') { from(project(':datastream-kafka-connector').configurations.runtime) { into("libs/") } duplicatesStrategy 'exclude' } + tasks.create(name: "copyDependentLibs", type: Copy) { from (configurations.runtime) { } diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcContextAware.java b/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcContextAware.java new file mode 100644 index 000000000..b5d61a397 --- /dev/null +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcContextAware.java @@ -0,0 +1,19 @@ +/** + * Copyright 2021 Wayfair LLC. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.common.logging; + +import java.util.Map; + +/** + * Interface for MDC Support + */ +public interface MdcContextAware { + /** + * return mdc context + * @return + */ + Map getContextMap(); +} diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcUtils.java b/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcUtils.java new file mode 100644 index 000000000..012657a64 --- /dev/null +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/logging/MdcUtils.java @@ -0,0 +1,57 @@ +/** + * Copyright 2021 Wayfair LLC. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.common.logging; + +import java.util.Map; + +import org.slf4j.MDC; + +/** + * Utility class for MDC support + */ +public final class MdcUtils { + /** + * Helper method to setup the MDC context + * @param ctx {@link MdcContextAware} + */ + public static void mergeMdcContext(MdcContextAware ctx) { + for (Map.Entry entry : ctx.getContextMap().entrySet()) { + MDC.put(entry.getKey(), entry.getValue()); + } + } + /** + * Helper method to setup the MDC context + * @param ctx {@link MdcContextAware} + */ + public static void setMdcContext(MdcContextAware ctx) { + MDC.clear(); + MDC.setContextMap(ctx.getContextMap()); + } + + /** + * Helper method to setup the MDC context + * @param ctxMap + */ + public static void setMdcContext(Map ctxMap) { + MDC.clear(); + if (ctxMap != null) { + MDC.setContextMap(ctxMap); + } + } + + /** + * Helper method to carry the MDC from the current thread to the new thread + * @param runnable {@link Runnable} + * @return + */ + public static Runnable withMdc(Runnable runnable) { + Map map = MDC.getCopyOfContextMap(); + return () -> { + MDC.setContextMap(map); + runnable.run(); + }; + } +} diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/translator/ResultSetTranslator.java b/datastream-common/src/main/java/com/linkedin/datastream/common/translator/ResultSetTranslator.java index afe1be576..8e845dda8 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/translator/ResultSetTranslator.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/translator/ResultSetTranslator.java @@ -15,7 +15,6 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Date; -import java.util.function.Function; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -234,12 +233,13 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException for (int i = 1; i <= nrOfColumns; i++) { /** * as per jdbc 4 specs, getColumnLabel will have the alias for the column, if not it will have the column name. - * so it may be a better option to check for columnlabel first and if in case it is null is someimplementation, + * so it may be a better option to check for columnlabel first and if in case it is null in some implementation, * check for alias. Postgres is the one that has the null column names for calculated fields. */ String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) : meta.getColumnName(i); String columnName = nameOrLabel; String sqlType = null; + boolean isColumnNullable = meta.isNullable(i) == ResultSetMetaData.columnNullable; switch (meta.getColumnType(i)) { case CHAR: case LONGNVARCHAR: @@ -251,25 +251,29 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException case NCLOB: case OTHER: case Types.SQLXML: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); - break; + + // java.sql.RowId is interface, is seems to be database + // implementation specific, let's convert to String + case ROWID: + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.STRING); + break; case BIT: case BOOLEAN: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.BOOLEAN); break; case INTEGER: if (meta.isSigned(i) || (meta.getPrecision(i) > 0 && meta.getPrecision(i) < MAX_DIGITS_IN_INT)) { - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.INT); } else { - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.LONG); } break; case SMALLINT: case TINYINT: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.INT); break; case BIGINT: @@ -278,27 +282,21 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException // to strings as necessary int precision = meta.getPrecision(i); if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) { - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.STRING); } else { - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.LONG); } break; - // java.sql.RowId is interface, is seems to be database - // implementation specific, let's convert to String - case ROWID: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); - break; - case FLOAT: case REAL: case 100: //Oracle BINARY_FLOAT type - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.FLOAT); break; case DOUBLE: case 101: //Oracle BINARY_DOUBLE type - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.DOUBLE); break; // Since Avro 1.8, LogicalType is supported. @@ -323,25 +321,20 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : DEFAULT_SCALE_VALUE; } final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale); - addNullableField(builder, columnName, - u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))); - + buildColumnSchema(builder, columnName, isColumnNullable, decimal.addToSchema(SchemaBuilder.builder().bytesType())); break; case DATE: - addNullableField(builder, columnName, - u -> u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))); + buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())); break; case TIME: - addNullableField(builder, columnName, - u -> u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()))); + buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType())); break; case -101: // Oracle's TIMESTAMP WITH TIME ZONE case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE - addNullableField(builder, columnName, - u -> u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))); + buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType())); break; case TIMESTAMP: @@ -358,7 +351,7 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException if (sqlType != null) { timestampMilliType.addProp(SOURCE_SQL_DATA_TYPE, sqlType); } - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(timestampMilliType).endUnion().nullDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, timestampMilliType); break; case BINARY: @@ -366,7 +359,7 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException case LONGVARBINARY: case ARRAY: case BLOB: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); + buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.BYTES); break; @@ -379,14 +372,22 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException return builder.endRecord(); } - private static void addNullableField( - SchemaBuilder.FieldAssembler builder, - String columnName, - Function>>, - SchemaBuilder.UnionAccumulator>> func - ) { - final SchemaBuilder.BaseTypeBuilder>> - and = builder.name(columnName).type().unionOf().nullType().and(); - func.apply(and).endUnion().noDefault(); + /** + * helper method to build the avro schema for a given column + * @param builder the schema builder + * @param columnName name of the column + * @param isNullable indicate if the column is nullable + * @param type {@link Schema.Type} column type in avro + */ + private static void buildColumnSchema(SchemaBuilder.FieldAssembler builder, String columnName, boolean isNullable, Schema.Type type) { + buildColumnSchema(builder, columnName, isNullable, Schema.create(type)); + } + + private static void buildColumnSchema(SchemaBuilder.FieldAssembler builder, String columnName, boolean isNullable, Schema type) { + if (isNullable) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(type).endUnion().noDefault(); + } else { + builder.name(columnName).type(type).noDefault(); + } } } diff --git a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnector.java b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnector.java index 53b312d22..604cbc084 100644 --- a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnector.java +++ b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnector.java @@ -6,6 +6,7 @@ package com.linkedin.datastream.connectors.jdbc; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -14,10 +15,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - +import java.util.stream.Collectors; import javax.sql.DataSource; import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +28,9 @@ import com.linkedin.data.template.StringMap; import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.DatastreamMetadataConstants; +import com.linkedin.datastream.common.DatastreamRuntimeException; import com.linkedin.datastream.common.VerifiableProperties; +import com.linkedin.datastream.metrics.DynamicMetricsManager; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.api.connector.Connector; import com.linkedin.datastream.server.api.connector.DatastreamValidationException; @@ -59,6 +63,8 @@ public class JDBCConnector implements Connector { private static final int DEFAULT_MAX_POLL_RECORDS = 10000; private static final int DEFAULT_MAX_FETCH_SIZE = 1000; + public static final String METRIC_NUMBER_OF_TASK_START_SUCCESS = "numberOfTaskStartSuccess"; + public static final String METRIC_NUMBER_OF_TASK_START_FAILURE = "numberOfTaskStartFailure"; private final ConcurrentMap _datasources; private final Map _jdbcConnectorTasks; @@ -94,7 +100,7 @@ public JDBCConnector(VerifiableProperties config) { _jdbcUserPassword = Passwords.get(config.getString(CONFIG_JDBC_CREDENTIAL_NAME)); } catch (IOException e) { _logger.error("Unable to decrypt password."); - throw new RuntimeException(e); + throw new DatastreamRuntimeException("Unable to decrypt password.", e); } _cpMinIdle = config.getInt(CONFIG_CP_MIN_IDLE, 1); @@ -128,78 +134,112 @@ public synchronized void onAssignmentChange(List tasks) { _jdbcConnectorTasks.remove(t); }); + boolean havingFailures = false; + List exceptions = new ArrayList<>(); + List failureTasks = new ArrayList<>(); + for (DatastreamTask task : tasks) { if (!_jdbcConnectorTasks.containsKey(task)) { - _logger.info("Creating JDBC connector task for " + task); - - String connString = task.getDatastreamSource().getConnectionString(); - - DataSource dataSource = _datasources.computeIfAbsent( - connString, - k -> { - BasicDataSource ds = new BasicDataSource(); - ds.setUrl(connString); - ds.setUsername(_jdbcUser); - ds.setPassword(_jdbcUserPassword); - ds.setMinIdle(_cpMinIdle); - ds.setMaxIdle(_cpMaxIdle); - return ds; - } - ); - - StringMap metadata = task.getDatastreams().get(0).getMetadata(); - JDBCConnectorTask.JDBCConnectorTaskBuilder builder = new JDBCConnectorTask.JDBCConnectorTaskBuilder(); - builder.setDatastreamName(task.getDatastreams().get(0).getName()) - .setEventProducer(task.getEventProducer()) - .setDataSource(dataSource) - .setPollFrequencyMS(Long.parseLong(metadata.get(DS_CONFIG_POLL_FREQUENCY_MS))) - .setDestinationTopic(metadata.get(DS_CONFIG_DESTINATION_TOPIC)) - .setCheckpointStoreURL(_checkpointStoreUrl) - .setCheckpointStoreTopic(_checkpointStoreTopic); - - if (metadata.containsKey(DS_CONFIG_QUERY)) { - builder.setQuery(metadata.get(DS_CONFIG_QUERY)); + _logger.info("Creating JDBC connector task for {}", task); + + JDBCConnectorTask jdbcConnectorTask = getJdbcConnectorTask(task); + try { + // if something goes wrong during task start, + // ignore the exception and make sure it does not block the remaining tasks from starting + jdbcConnectorTask.start(); + _jdbcConnectorTasks.put(task, jdbcConnectorTask); + + recordTaskStartupMetric(task, true); + } catch (Exception e) { + _logger.warn("exception caught during task start: {}", jdbcConnectorTask, e); + havingFailures = true; + failureTasks.add(task); + exceptions.add(e); + + recordTaskStartupMetric(task, false); } + } + } + if (havingFailures) { + throw new DatastreamRuntimeException("Failed to start jdbc tasks: " + failureTasks + ", exceptions: " + + exceptions.stream().map(Exception::getMessage).collect(Collectors.joining(","))); + } + } - if (metadata.containsKey(DS_CONFIG_INCREMENTING_COLUMN_NAME)) { - builder.setIncrementingColumnName(metadata.get(DS_CONFIG_INCREMENTING_COLUMN_NAME)); - } + private void recordTaskStartupMetric(DatastreamTask task, boolean success) { + DynamicMetricsManager.getInstance().createOrUpdateMeter( + this.getClass().getSimpleName(), + task.getDatastreamTaskName(), + success ? METRIC_NUMBER_OF_TASK_START_SUCCESS : METRIC_NUMBER_OF_TASK_START_FAILURE, + 1 + ); + } - if (metadata.containsKey(DS_CONFIG_INCREMENTING_INITIAL)) { - builder.setIncrementingInitial(Long.parseLong(metadata.get(DS_CONFIG_INCREMENTING_INITIAL))); + private JDBCConnectorTask getJdbcConnectorTask(DatastreamTask task) { + String connString = task.getDatastreamSource().getConnectionString(); + + DataSource dataSource = _datasources.computeIfAbsent( + connString, + k -> { + BasicDataSource ds = new BasicDataSource(); + ds.setUrl(connString); + ds.setUsername(_jdbcUser); + ds.setPassword(_jdbcUserPassword); + ds.setMinIdle(_cpMinIdle); + ds.setMaxIdle(_cpMaxIdle); + return ds; } + ); + + StringMap metadata = task.getDatastreams().get(0).getMetadata(); + JDBCConnectorTask.JDBCConnectorTaskBuilder builder = new JDBCConnectorTask.JDBCConnectorTaskBuilder(); + builder.setDatastreamName(task.getDatastreams().get(0).getName()) + .setEventProducer(task.getEventProducer()) + .setDataSource(dataSource) + .setPollFrequencyMS(Long.parseLong(metadata.get(DS_CONFIG_POLL_FREQUENCY_MS))) + .setDestinationTopic(metadata.get(DS_CONFIG_DESTINATION_TOPIC)) + .setCheckpointStoreURL(_checkpointStoreUrl) + .setCheckpointStoreTopic(_checkpointStoreTopic); + + if (metadata.containsKey(DS_CONFIG_QUERY)) { + builder.setQuery(metadata.get(DS_CONFIG_QUERY)); + } - if (metadata.containsKey(DS_CONFIG_TABLE)) { - builder.setTable(metadata.get(DS_CONFIG_TABLE)); - } + if (metadata.containsKey(DS_CONFIG_INCREMENTING_COLUMN_NAME)) { + builder.setIncrementingColumnName(metadata.get(DS_CONFIG_INCREMENTING_COLUMN_NAME)); + } - if (metadata.containsKey(DS_CONFIG_MAX_POLL_ROWS)) { - try { - builder.setMaxPollRows(Integer.parseInt(metadata.get(DS_CONFIG_MAX_POLL_ROWS))); - } catch (NumberFormatException e) { - _logger.warn(DS_CONFIG_MAX_POLL_ROWS + " config value is not a valid number. Using the default value " + DEFAULT_MAX_POLL_RECORDS); - builder.setMaxPollRows(DEFAULT_MAX_POLL_RECORDS); - } - } else { - builder.setMaxPollRows(DEFAULT_MAX_POLL_RECORDS); - } + if (metadata.containsKey(DS_CONFIG_INCREMENTING_INITIAL)) { + builder.setIncrementingInitial(Long.parseLong(metadata.get(DS_CONFIG_INCREMENTING_INITIAL))); + } - if (metadata.containsKey(DS_CONFIG_MAX_FETCH_SIZE)) { - try { - builder.setMaxFetchSize(Integer.parseInt(metadata.get(DS_CONFIG_MAX_FETCH_SIZE))); - } catch (NumberFormatException e) { - _logger.warn(DS_CONFIG_MAX_FETCH_SIZE + " config value is not a valid number. Using the default value " + DEFAULT_MAX_FETCH_SIZE); - builder.setMaxFetchSize(DEFAULT_MAX_FETCH_SIZE); - } - } else { - builder.setMaxFetchSize(DEFAULT_MAX_FETCH_SIZE); - } + if (metadata.containsKey(DS_CONFIG_TABLE)) { + builder.setTable(metadata.get(DS_CONFIG_TABLE)); + } + + if (metadata.containsKey(DS_CONFIG_MAX_POLL_ROWS)) { + try { + builder.setMaxPollRows(Integer.parseInt(metadata.get(DS_CONFIG_MAX_POLL_ROWS))); + } catch (NumberFormatException e) { + _logger.warn(DS_CONFIG_MAX_POLL_ROWS + " config value is not a valid number. Using the default value " + DEFAULT_MAX_POLL_RECORDS); + builder.setMaxPollRows(DEFAULT_MAX_POLL_RECORDS); + } + } else { + builder.setMaxPollRows(DEFAULT_MAX_POLL_RECORDS); + } - JDBCConnectorTask jdbcConnectorTask = builder.build(); - _jdbcConnectorTasks.put(task, jdbcConnectorTask); - jdbcConnectorTask.start(); + if (metadata.containsKey(DS_CONFIG_MAX_FETCH_SIZE)) { + try { + builder.setMaxFetchSize(Integer.parseInt(metadata.get(DS_CONFIG_MAX_FETCH_SIZE))); + } catch (NumberFormatException e) { + _logger.warn(DS_CONFIG_MAX_FETCH_SIZE + " config value is not a valid number. Using the default value " + DEFAULT_MAX_FETCH_SIZE); + builder.setMaxFetchSize(DEFAULT_MAX_FETCH_SIZE); } + } else { + builder.setMaxFetchSize(DEFAULT_MAX_FETCH_SIZE); } + + return builder.build(); } @Override @@ -229,8 +269,8 @@ public void initializeDatastream(Datastream stream, List allDatastre throw new DatastreamValidationException(DS_CONFIG_INCREMENTING_INITIAL + " config value is not an integer"); } - if ((!metadata.containsKey(DS_CONFIG_QUERY) || metadata.get(DS_CONFIG_QUERY).length() == 0) && - (!metadata.containsKey(DS_CONFIG_TABLE) || metadata.get(DS_CONFIG_TABLE).length() == 0)) { + if ((!metadata.containsKey(DS_CONFIG_QUERY) || StringUtils.isEmpty(metadata.get(DS_CONFIG_QUERY))) && + (!metadata.containsKey(DS_CONFIG_TABLE) || StringUtils.isEmpty(metadata.get(DS_CONFIG_TABLE)))) { throw new DatastreamValidationException("One of the two config options " + DS_CONFIG_QUERY + " and " + DS_CONFIG_TABLE + " must be provided."); } @@ -239,4 +279,4 @@ public void initializeDatastream(Datastream stream, List allDatastre stream.setMetadata(metadata); } -} \ No newline at end of file +} diff --git a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnectorTask.java b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnectorTask.java index 8ed6a58ae..1a2b7be5c 100644 --- a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnectorTask.java +++ b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/JDBCConnectorTask.java @@ -12,6 +12,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -20,14 +21,18 @@ import javax.sql.DataSource; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.linkedin.datastream.common.BrooklinEnvelope; import com.linkedin.datastream.common.BrooklinEnvelopeMetadataConstants; import com.linkedin.datastream.common.DatastreamRecordMetadata; +import com.linkedin.datastream.common.logging.MdcContextAware; +import com.linkedin.datastream.common.logging.MdcUtils; import com.linkedin.datastream.common.translator.LongTranslator; import com.linkedin.datastream.common.translator.ResultSetTranslator; +import com.linkedin.datastream.metrics.DynamicMetricsManager; import com.linkedin.datastream.server.DatastreamEventProducer; import com.linkedin.datastream.server.DatastreamProducerRecordBuilder; import com.linkedin.datastream.server.FlushlessEventProducerHandler; @@ -39,9 +44,11 @@ /** * Connector task that reads data from SQL server and streams to the specified transportprovider. */ -public class JDBCConnectorTask { +public class JDBCConnectorTask implements MdcContextAware { private static final Logger _logger = LoggerFactory.getLogger(JDBCConnectorTask.class); + public static final String METRIC_NUMBER_OF_PRODUCER_FAILURE = "numberOfProducerFailure"; + public static final String METRIC_NUMBER_OF_ROWS_PROCESSED = "numberOfRowsProcessed"; private final ScheduledExecutorService _scheduler = Executors.newScheduledThreadPool(1); private final String _datastreamName; @@ -117,12 +124,25 @@ private void processResults(ResultSet resultSet) throws SQLException, IOExceptio if (exception != null) { _resetToSafeCommit.set(true); _logger.warn("failed to send row {}. {}", checkpoint, exception); + + recordJdbcTaskMetrics(METRIC_NUMBER_OF_PRODUCER_FAILURE); + } else { + recordJdbcTaskMetrics(METRIC_NUMBER_OF_ROWS_PROCESSED); } }); _committingCheckpoint = checkpoint; } } + private void recordJdbcTaskMetrics(String numberOfProducerFailure) { + DynamicMetricsManager.getInstance().createOrUpdateMeter( + this.getClass().getSimpleName(), + this._datastreamName, + numberOfProducerFailure, + 1 + ); + } + private synchronized void mayCommitCheckpoint() { Optional safeCheckpoint = _flushlessProducer.getAckCheckpoint(_id, 0); if (safeCheckpoint.isPresent()) { @@ -142,9 +162,17 @@ private Long getInitialCheckpoint() { private String generateStatement() { String suffix = " WHERE " + _incrementingColumnName + " > ? ORDER BY " + _incrementingColumnName + " ASC"; - return (_query != null) ? - _query + suffix : - "SELECT * FROM " + _table + " WITH (NOLOCK)" + suffix; + if (StringUtils.isNotBlank(_query)) { + // Assume the query is complete if it has `where` + if (_query.toUpperCase().contains("WHERE")) { + return _query; + } + return _query + suffix; + } + if (StringUtils.isBlank(_table)) { + throw new IllegalArgumentException("table name cannot be empty!"); + } + return "SELECT * FROM " + _table + " WITH (NOLOCK)" + suffix; } private void poll() { @@ -155,7 +183,7 @@ private void poll() { if (_committingCheckpoint == null) { // try getting checkpoint from CheckpointProvider. _committingCheckpoint = _checkpointProvider.getSafeCheckpoint( - () -> new JDBCCheckpoint.Deserializer(), + JDBCCheckpoint.Deserializer::new, JDBCCheckpoint.class); } @@ -176,10 +204,18 @@ private void poll() { } } } catch (SQLException | IOException e) { - _logger.warn("Failed to poll for datastream {} {}", _datastreamName, e); + _logger.warn("Failed to poll for datastream {}", _datastreamName, e); } } + @Override + public Map getContextMap() { + Map map = new HashMap<>(); + map.put("datastream", this._datastreamName); + map.put("id", this._id); + return map; + } + /** * start the task */ @@ -188,9 +224,10 @@ public void start() { this._checkpointProvider = new KafkaCustomCheckpointProvider<>(_id, _checkpointStorerURL, _checkpointStoreTopic); _scheduler.scheduleWithFixedDelay(() -> { try { + MdcUtils.setMdcContext(this.getContextMap()); this.poll(); } catch (Exception e) { - _logger.warn("Failed poll. {}", e); + _logger.error("Failed poll datastream: {}", _datastreamName, e); } }, _pollFrequencyMS, @@ -375,4 +412,15 @@ public JDBCConnectorTask build() { return new JDBCConnectorTask(this); } } + + @Override + public String toString() { + return String.format("JDBCConnectorTask[id:%s] - datastream name: %s, incrementing column name: %s, table: %s, query: %s, destination topic: %s", + this._id, + this._datastreamName, + this._incrementingColumnName, + this._table, + this._query, + this._destinationTopic); + } } diff --git a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/cdc/SQLServerCDCConnector.java b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/cdc/SQLServerCDCConnector.java index a68db731b..ff30bcd84 100644 --- a/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/cdc/SQLServerCDCConnector.java +++ b/datastream-jdbc-connector/src/main/java/com/linkedin/datastream/connectors/jdbc/cdc/SQLServerCDCConnector.java @@ -16,7 +16,6 @@ import java.util.concurrent.ConcurrentMap; import javax.sql.DataSource; -import com.linkedin.datastream.common.DatastreamRuntimeException; import org.apache.commons.dbcp.BasicDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +25,7 @@ import com.linkedin.data.template.StringMap; import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.DatastreamMetadataConstants; +import com.linkedin.datastream.common.DatastreamRuntimeException; import com.linkedin.datastream.common.VerifiableProperties; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.api.connector.Connector; @@ -232,4 +232,4 @@ public void initializeDatastream(Datastream stream, List allDatastre stream.setMetadata(metadata); } -} \ No newline at end of file +} diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/JDBCCheckpoint.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/JDBCCheckpoint.java index 2fc3d4130..563199110 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/JDBCCheckpoint.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/JDBCCheckpoint.java @@ -5,6 +5,7 @@ */ package com.linkedin.datastream.server.providers; +import java.nio.charset.StandardCharsets; import java.nio.charset.UnsupportedCharsetException; import java.util.Arrays; @@ -31,7 +32,7 @@ public JDBCCheckpoint(long offset) { @Override public byte[] serialize() { - return Longs.toByteArray(_offset); + return String.valueOf(_offset).getBytes(StandardCharsets.UTF_8); } @Override @@ -82,7 +83,7 @@ public T deserialize(byte[] value, Class ch long offset = -1; try { // Old version of checkpoint uses the string representation of Long. To be compatible still use same format. - offset = Longs.fromByteArray(value); + offset = Long.parseLong(new String(value, StandardCharsets.UTF_8)); } catch (NumberFormatException | UnsupportedCharsetException e) { throw new DatastreamRuntimeException("Invalid CDC checkpoint offset " + Arrays.toString(value)); } diff --git a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/DatastreamServer.java b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/DatastreamServer.java index 46f3299c9..71e8e84fd 100644 --- a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/DatastreamServer.java +++ b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/DatastreamServer.java @@ -137,13 +137,13 @@ public class DatastreamServer { */ public DatastreamServer(Properties properties) throws DatastreamException { _properties = properties; - _logger.info("Start to initialize DatastreamServer. Properties: " + _properties); + _logger.info("Start to initialize DatastreamServer. Properties: {}", _properties); _logger.info("Creating coordinator."); VerifiableProperties verifiableProperties = new VerifiableProperties(_properties); HashSet connectorTypes = new HashSet<>(verifiableProperties.getStringList(CONFIG_CONNECTOR_NAMES, Collections.emptyList())); - if (connectorTypes.size() == 0) { + if (connectorTypes.isEmpty()) { String errorMessage = "No connectors specified in connectorTypes"; _logger.error(errorMessage); throw new DatastreamRuntimeException(errorMessage); @@ -151,7 +151,7 @@ public DatastreamServer(Properties properties) throws DatastreamException { HashSet transportProviderNames = new HashSet<>(verifiableProperties.getStringList(CONFIG_TRANSPORT_PROVIDER_NAMES, Collections.emptyList())); - if (transportProviderNames.size() == 0) { + if (transportProviderNames.isEmpty()) { String errorMessage = "No transport providers specified in config: " + CONFIG_TRANSPORT_PROVIDER_NAMES; _logger.error(errorMessage); throw new DatastreamRuntimeException(errorMessage); @@ -300,7 +300,7 @@ private void initializeSerde(String serdeName, Properties serdeConfig) { } private void initializeTransportProvider(String transportProviderName, Properties transportProviderConfig) { - _logger.info("Starting to load the transport provider: " + transportProviderName); + _logger.info("Starting to load the transport provider: {}", transportProviderName); String factoryClassName = transportProviderConfig.getProperty(CONFIG_FACTORY_CLASS_NAME, ""); if (StringUtils.isBlank(factoryClassName)) { @@ -321,7 +321,7 @@ private void initializeTransportProvider(String transportProviderName, Propertie } private void initializeConnector(String connectorName, Properties connectorProperties, String clusterName) { - _logger.info("Starting to load connector: " + connectorName); + _logger.info("Starting to load connector: {}", connectorName); VerifiableProperties connectorProps = new VerifiableProperties(connectorProperties); @@ -385,7 +385,7 @@ private void initializeConnector(String connectorName, Properties connectorPrope _coordinator.addConnector(connectorName, connectorInstance, assignmentStrategy, customCheckpointing, deduper, authorizerName); - _logger.info("Connector loaded successfully. Type: " + connectorName); + _logger.info("Connector loaded successfully. Type: {}", connectorName); } private void initializeMetrics() { @@ -396,7 +396,7 @@ private void initializeMetrics() { _jmxReporter = JmxReporterFactory.createJmxReporter(METRIC_REGISTRY); if (StringUtils.isNotEmpty(_csvMetricsDir)) { - _logger.info("Starting CsvReporter in " + _csvMetricsDir); + _logger.info("Starting CsvReporter in {}",_csvMetricsDir); File csvDir = new File(_csvMetricsDir); if (!csvDir.exists()) { _logger.info("csvMetricsDir {} doesn't exist, creating it.", _csvMetricsDir); diff --git a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerHealthResources.java b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerHealthResources.java index b508da90f..7102c9d7b 100644 --- a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerHealthResources.java +++ b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerHealthResources.java @@ -60,7 +60,7 @@ public ServerHealthResources(DatastreamServer datastreamServer) { public ServerHealth get() { _logger.info("Get request for serverHealth"); ServerHealth health = buildServerHealth(); - _logger.info("Server Health: " + health.toString()); + _logger.info("Server Health: {}", health); return health; } @@ -122,7 +122,7 @@ private TaskHealthArray buildTasksHealthForConnectorType(String connectorType) { try { checkpointProvider = _server.getCustomCheckpointProvider(task.getDatastreams().get(0)); JDBCCheckpoint checkpoint = checkpointProvider.getSafeCheckpoint( - () -> new JDBCCheckpoint.Deserializer(), + JDBCCheckpoint.Deserializer::new, JDBCCheckpoint.class ); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/KafkaCustomCheckpointProvider.java b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/KafkaCustomCheckpointProvider.java index d8370e1df..dbc7ad2c3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/KafkaCustomCheckpointProvider.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/KafkaCustomCheckpointProvider.java @@ -10,9 +10,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import com.linkedin.datastream.common.DatastreamRuntimeException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; @@ -95,10 +97,12 @@ public KafkaCustomCheckpointProvider(String taskId, result.values().get(topic).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + // assume the topic already exists and ignore this exception + throw new DatastreamRuntimeException(e); } catch (ExecutionException e) { if (!(e.getCause() instanceof TopicExistsException)) { - throw new RuntimeException(e); + _logger.warn("Exception occurred while trying to create the checkpoint kafka topic", e); + throw new DatastreamRuntimeException(e); } } diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 000000000..a17ec7297 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,16 @@ +version: '3' +services: + brooklin: + image: brooklin + environment: + JVM_OPTS: "-Xms1024M -Xmx2048M" + LOG4J_OPTS: "-Dlog4j.configuration=file:/opt/app/brooklin/config/log4j.properties" + JMX_PORT: "9990" + DEBUG_OPTS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND:-n},address=5005" + HOST_IP: "${DOCKER_HOST_IP:-0.0.0.0}" + volumes: + - ${CONFIG_DIR}:/opt/app/brooklin/config + ports: + - "32311:32311" + - "5005:5005" + - "9990:9990" diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 000000000..fa6bab07c --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,16 @@ +FROM wayfair/maven-openjdk8:0.9.2 + +ARG VERSION + +EXPOSE 32311 +EXPOSE 9990 +EXPOSE 5005 + +RUN yum install vim jq net-tools -y +ENV BROOKLIN_TAR_FILE brooklin-${VERSION}.tgz + +COPY ${BROOKLIN_TAR_FILE} /opt/app/ +RUN tar -zxf /opt/app/${BROOKLIN_TAR_FILE} --directory=/opt/app && mv /opt/app/brooklin-${VERSION} /opt/app/brooklin +WORKDIR /opt/app/brooklin + +ENTRYPOINT ["/opt/app/brooklin/bin/brooklin-server-start.sh", "/opt/app/brooklin/config/server.properties"] diff --git a/scripts/brooklin-server-start.sh b/scripts/brooklin-server-start.sh index 33c942a79..8e16b59b0 100755 --- a/scripts/brooklin-server-start.sh +++ b/scripts/brooklin-server-start.sh @@ -15,7 +15,8 @@ if [ "x$HEAP_OPTS" = "x" ]; then export HEAP_OPTS="-Xmx1G -Xms1G" fi -EXTRA_ARGS="-name brooklin -loggc" +# Add the optional debug params, useful for development +EXTRA_ARGS="-name brooklin -loggc ${DEBUG_OPTS} " COMMAND=$1 case $COMMAND in diff --git a/scripts/git/commit-msg b/scripts/git/commit-msg index 75342ab67..dcc274f64 100755 --- a/scripts/git/commit-msg +++ b/scripts/git/commit-msg @@ -102,7 +102,7 @@ def print_error(msg): def print_warning(msg): - print TerminalColors.warn(msg) + print(TerminalColors.warn(msg)) def process_commit_message(msg): diff --git a/scripts/run-class.sh b/scripts/run-class.sh index b8328e08a..ca158f8db 100755 --- a/scripts/run-class.sh +++ b/scripts/run-class.sh @@ -41,7 +41,7 @@ fi # JMX port to use if [ $JMX_PORT ]; then - JMX_OPTS="$JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " + JMX_OPTS="$JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Djava.rmi.server.hostname=${HOST_IP:-0.0.0.0} " fi # Log directory to use