Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ apply from: file('gradle/environment.gradle')
apply from: file("gradle/dependency-versions.gradle")
apply from: file("gradle/install-git-hooks.gradle")

apply plugin: 'com.palantir.docker'

buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "com.palantir.docker:com.palantir.docker.gradle.plugin:0.26.0"
}
apply from: file('gradle/buildscript.gradle'), to: buildscript
}

Expand All @@ -18,7 +23,6 @@ allprojects {
apply plugin: 'project-report'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'

repositories {
mavenCentral()
jcenter()
Expand All @@ -44,6 +48,14 @@ idea {
}
}

docker {
name 'brooklin'
tags 'latest' // deprecated, use 'tag'
dockerfile file('docker/Dockerfile')
files file("${project.rootDir}/datastream-tools/build/distributions/${rootProject.name}-${rootProject.version}.tgz")
buildArgs([VERSION: rootProject.version])
}

subprojects {
apply plugin: 'java'
apply plugin: 'pegasus'
Expand Down Expand Up @@ -395,6 +407,7 @@ project(':datastream-tools') {
from(project(':datastream-kafka-connector').configurations.runtime) { into("libs/") }
duplicatesStrategy 'exclude'
}

tasks.create(name: "copyDependentLibs", type: Copy) {
from (configurations.runtime) {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2021 Wayfair LLC. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.common.logging;

import java.util.Map;

/**
* Interface for MDC Support
*/
public interface MdcContextAware {
/**
* return mdc context
* @return
*/
Map<String, String> getContextMap();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright 2021 Wayfair LLC. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.common.logging;

import java.util.Map;

import org.slf4j.MDC;

/**
* Utility class for MDC support
*/
public final class MdcUtils {
/**
* Helper method to setup the MDC context
* @param ctx {@link MdcContextAware}
*/
public static void mergeMdcContext(MdcContextAware ctx) {
for (Map.Entry<String, String> entry : ctx.getContextMap().entrySet()) {
MDC.put(entry.getKey(), entry.getValue());
}
}
/**
* Helper method to setup the MDC context
* @param ctx {@link MdcContextAware}
*/
public static void setMdcContext(MdcContextAware ctx) {
MDC.clear();
MDC.setContextMap(ctx.getContextMap());
}

/**
* Helper method to setup the MDC context
* @param ctxMap
*/
public static void setMdcContext(Map<String, String> ctxMap) {
MDC.clear();
if (ctxMap != null) {
MDC.setContextMap(ctxMap);
}
}

/**
* Helper method to carry the MDC from the current thread to the new thread
* @param runnable {@link Runnable}
* @return
*/
public static Runnable withMdc(Runnable runnable) {
Map<String, String> map = MDC.getCopyOfContextMap();
return () -> {
MDC.setContextMap(map);
runnable.run();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.function.Function;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -234,12 +233,13 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
for (int i = 1; i <= nrOfColumns; i++) {
/**
* as per jdbc 4 specs, getColumnLabel will have the alias for the column, if not it will have the column name.
* so it may be a better option to check for columnlabel first and if in case it is null is someimplementation,
* so it may be a better option to check for columnlabel first and if in case it is null in some implementation,
* check for alias. Postgres is the one that has the null column names for calculated fields.
*/
String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) : meta.getColumnName(i);
String columnName = nameOrLabel;
String sqlType = null;
boolean isColumnNullable = meta.isNullable(i) == ResultSetMetaData.columnNullable;
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
Expand All @@ -251,25 +251,29 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
case NCLOB:
case OTHER:
case Types.SQLXML:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;

// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.STRING);
break;

case BIT:
case BOOLEAN:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.BOOLEAN);
break;

case INTEGER:
if (meta.isSigned(i) || (meta.getPrecision(i) > 0 && meta.getPrecision(i) < MAX_DIGITS_IN_INT)) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.INT);
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.LONG);
}
break;

case SMALLINT:
case TINYINT:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.INT);
break;

case BIGINT:
Expand All @@ -278,27 +282,21 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
// to strings as necessary
int precision = meta.getPrecision(i);
if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.STRING);
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.LONG);
}
break;

// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;

case FLOAT:
case REAL:
case 100: //Oracle BINARY_FLOAT type
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.FLOAT);
break;

case DOUBLE:
case 101: //Oracle BINARY_DOUBLE type
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.DOUBLE);
break;

// Since Avro 1.8, LogicalType is supported.
Expand All @@ -323,25 +321,20 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : DEFAULT_SCALE_VALUE;
}
final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
addNullableField(builder, columnName,
u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));

buildColumnSchema(builder, columnName, isColumnNullable, decimal.addToSchema(SchemaBuilder.builder().bytesType()));
break;

case DATE:
addNullableField(builder, columnName,
u -> u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())));
buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()));
break;

case TIME:
addNullableField(builder, columnName,
u -> u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType())));
buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()));
break;

case -101: // Oracle's TIMESTAMP WITH TIME ZONE
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
addNullableField(builder, columnName,
u -> u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType())));
buildColumnSchema(builder, columnName, isColumnNullable, LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()));
break;

case TIMESTAMP:
Expand All @@ -358,15 +351,15 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
if (sqlType != null) {
timestampMilliType.addProp(SOURCE_SQL_DATA_TYPE, sqlType);
}
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(timestampMilliType).endUnion().nullDefault();
buildColumnSchema(builder, columnName, isColumnNullable, timestampMilliType);
break;

case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
buildColumnSchema(builder, columnName, isColumnNullable, Schema.Type.BYTES);
break;


Expand All @@ -379,14 +372,22 @@ public Schema translateSchemaToInternalFormat(ResultSet rs) throws SQLException
return builder.endRecord();
}

private static void addNullableField(
SchemaBuilder.FieldAssembler<Schema> builder,
String columnName,
Function<SchemaBuilder.BaseTypeBuilder<SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>,
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>> func
) {
final SchemaBuilder.BaseTypeBuilder<SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
and = builder.name(columnName).type().unionOf().nullType().and();
func.apply(and).endUnion().noDefault();
/**
* helper method to build the avro schema for a given column
* @param builder the schema builder
* @param columnName name of the column
* @param isNullable indicate if the column is nullable
* @param type {@link Schema.Type} column type in avro
*/
private static void buildColumnSchema(SchemaBuilder.FieldAssembler<Schema> builder, String columnName, boolean isNullable, Schema.Type type) {
buildColumnSchema(builder, columnName, isNullable, Schema.create(type));
}

private static void buildColumnSchema(SchemaBuilder.FieldAssembler<Schema> builder, String columnName, boolean isNullable, Schema type) {
if (isNullable) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(type).endUnion().noDefault();
} else {
builder.name(columnName).type(type).noDefault();
}
}
}
Loading