Skip to content

Commit 878f724

Browse files
authored
[BitSail]Add dirty collector options. (#391)
[BitSail][Improve]Add dirty collector options.
1 parent c69cc1f commit 878f724

File tree

23 files changed

+198
-57
lines changed

23 files changed

+198
-57
lines changed

bitsail-base/src/main/java/com/bytedance/bitsail/base/dirty/AbstractDirtyCollector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public abstract class AbstractDirtyCollector implements Closeable, Serializable
5757
*/
5858
private int dirtyCount;
5959

60+
private final boolean allowed;
61+
6062
public AbstractDirtyCollector(BitSailConfiguration jobConf,
6163
int taskId) {
6264
this.jobConf = jobConf;
@@ -68,6 +70,8 @@ public AbstractDirtyCollector(BitSailConfiguration jobConf,
6870

6971
this.dirtySampleRatio = jobConf.get(CommonOptions.DirtyRecordOptions.DIRTY_SAMPLE_RATIO);
7072
this.dirtySampleRandom = new Random();
73+
74+
this.allowed = jobConf.get(CommonOptions.DirtyRecordOptions.DIRTY_RECORD_SKIP_ENABLED);
7175
}
7276

7377
/**
@@ -78,6 +82,13 @@ public AbstractDirtyCollector(BitSailConfiguration jobConf,
7882
* @param processingTime Processing timestamp for the record.
7983
*/
8084
public void collectDirty(Object dirtyObj, Throwable e, long processingTime) {
85+
if (!allowed) {
86+
throw new RuntimeException(
87+
String.format("Found dirty data but not allowed. " +
88+
"Please enable skip dirty record by adding user defined config %s=true. \n Dirty record: %s , \n Exception message: %s",
89+
CommonOptions.DirtyRecordOptions.DIRTY_RECORD_SKIP_ENABLED.key(),
90+
dirtyObj.toString(), e.getMessage()), e);
91+
}
8192
if (isRunning && shouldSample() && !Objects.isNull(dirtyObj)) {
8293
try {
8394
collect(dirtyObj, e, processingTime);

bitsail-common/src/main/java/com/bytedance/bitsail/common/column/ColumnCast.java

Lines changed: 80 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import java.io.UnsupportedEncodingException;
31+
import java.sql.Time;
3132
import java.time.Instant;
3233
import java.time.LocalDate;
3334
import java.time.LocalDateTime;
@@ -40,6 +41,7 @@
4041
import java.time.temporal.TemporalAccessor;
4142
import java.util.Date;
4243
import java.util.List;
44+
import java.util.Objects;
4345

4446
@Deprecated
4547
public final class ColumnCast {
@@ -59,7 +61,7 @@ public final class ColumnCast {
5961
private static ZoneId dateTimeZone;
6062
private static volatile boolean enabled = false;
6163

62-
public static void initColumnCast(BitSailConfiguration commonConfiguration) {
64+
public static synchronized void initColumnCast(BitSailConfiguration commonConfiguration) {
6365
if (enabled) {
6466
return;
6567
}
@@ -77,14 +79,14 @@ public static void initColumnCast(BitSailConfiguration commonConfiguration) {
7779
.TIME_PATTERN);
7880
encoding = commonConfiguration.get(CommonOptions.DateFormatOptions.COLUMN_ENCODING);
7981

80-
formatters = Lists.newArrayList();
82+
formatters = Lists.newCopyOnWriteArrayList();
8183
dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimePattern);
8284
dateFormatter = DateTimeFormatter.ofPattern(datePattern);
8385
timeFormatter = DateTimeFormatter.ofPattern(timePattern);
84-
commonConfiguration.get(CommonOptions.DateFormatOptions.EXTRA_FORMATS)
85-
.forEach(pattern -> formatters.add(DateTimeFormatter.ofPattern(pattern)));
8686
formatters.add(dateTimeFormatter);
8787
formatters.add(dateFormatter);
88+
commonConfiguration.get(CommonOptions.DateFormatOptions.EXTRA_FORMATS)
89+
.forEach(pattern -> formatters.add(DateTimeFormatter.ofPattern(pattern)));
8890
enabled = true;
8991
}
9092

@@ -98,18 +100,7 @@ public static Date string2Date(StringColumn column) {
98100
for (DateTimeFormatter formatter : formatters) {
99101
try {
100102
TemporalAccessor parse = formatter.parse(dateStr);
101-
LocalDateTime localDateTime = null;
102-
LocalDate localDate = LocalDate.from(parse);
103-
if (parse.isSupported(ChronoField.HOUR_OF_DAY)
104-
|| parse.isSupported(ChronoField.HOUR_OF_DAY)
105-
|| parse.isSupported(ChronoField.MINUTE_OF_HOUR)
106-
|| parse.isSupported(ChronoField.SECOND_OF_MINUTE)
107-
|| parse.isSupported(ChronoField.MICRO_OF_SECOND)) {
108-
localDateTime = LocalDateTime.of(localDate, LocalTime.from(parse));
109-
} else {
110-
localDateTime = localDate.atStartOfDay();
111-
}
112-
return Date.from(localDateTime.atZone(dateTimeZone).toInstant());
103+
return fromTemporalAccessor(parse);
113104
} catch (Exception e) {
114105
LOG.debug("Formatter = {} parse string {} failed.", formatter, dateStr, e);
115106
//ignore
@@ -118,25 +109,84 @@ public static Date string2Date(StringColumn column) {
118109
throw new IllegalArgumentException(String.format("String [%s] can't be parse by all formatter.", dateStr));
119110
}
120111

112+
private static Date fromTemporalAccessor(TemporalAccessor temporalAccessor) {
113+
LocalDate localDate = null;
114+
LocalTime localTime = null;
115+
116+
if (temporalAccessor.isSupported(ChronoField.YEAR_OF_ERA)
117+
&& temporalAccessor.isSupported(ChronoField.MONTH_OF_YEAR)
118+
&& temporalAccessor.isSupported(ChronoField.DAY_OF_MONTH)) {
119+
localDate = LocalDate.of(
120+
temporalAccessor.get(ChronoField.YEAR_OF_ERA),
121+
temporalAccessor.get(ChronoField.MONTH_OF_YEAR),
122+
temporalAccessor.get(ChronoField.DAY_OF_MONTH));
123+
}
124+
if (temporalAccessor.isSupported(ChronoField.HOUR_OF_DAY)
125+
&& temporalAccessor.isSupported(ChronoField.MINUTE_OF_HOUR)
126+
&& temporalAccessor.isSupported(ChronoField.SECOND_OF_MINUTE)) {
127+
localTime = LocalTime.of(
128+
temporalAccessor.get(ChronoField.HOUR_OF_DAY),
129+
temporalAccessor.get(ChronoField.MINUTE_OF_HOUR),
130+
temporalAccessor.get(ChronoField.SECOND_OF_MINUTE),
131+
temporalAccessor.get(ChronoField.NANO_OF_SECOND)
132+
);
133+
}
134+
if (Objects.nonNull(localDate)) {
135+
LocalDateTime localDateTime;
136+
if (Objects.nonNull(localTime)) {
137+
localDateTime = LocalDateTime.of(localDate, localTime);
138+
} else {
139+
localDateTime = localDate.atStartOfDay();
140+
}
141+
return Date.from(localDateTime.atZone(dateTimeZone).toInstant());
142+
}
143+
if (Objects.nonNull(localTime)) {
144+
return new Time(
145+
localTime.getHour(),
146+
localTime.getMinute(),
147+
localTime.getSecond()
148+
);
149+
}
150+
throw BitSailException.asBitSailException(CommonErrorCode.CONVERT_NOT_SUPPORT,
151+
String.format("Temporal %s can't convert to date.", temporalAccessor));
152+
}
153+
121154
public static String date2String(final DateColumn column) {
122155
checkState();
123156
if (null == column.asDate()) {
124157
return null;
125158
}
126-
Date date = column.asDate();
127-
OffsetDateTime offsetDateTime = Instant.ofEpochMilli(date.toInstant().toEpochMilli())
128-
.atZone(dateTimeZone).toOffsetDateTime();
129-
130-
switch (column.getSubType()) {
131-
case DATE:
132-
return dateFormatter.format(offsetDateTime);
133-
case TIME:
134-
return timeFormatter.format(offsetDateTime);
135-
case DATETIME:
136-
return dateTimeFormatter.format(offsetDateTime);
137-
default:
138-
throw BitSailException
139-
.asBitSailException(CommonErrorCode.CONVERT_NOT_SUPPORT, "");
159+
DateColumn.DateType subType = column.getSubType();
160+
if (DateColumn.DateType.DATE.equals(subType)
161+
|| DateColumn.DateType.TIME.equals(subType)
162+
|| DateColumn.DateType.DATETIME.equals(subType)) {
163+
Date date = column.asDate();
164+
OffsetDateTime offsetDateTime = Instant.ofEpochMilli(date.toInstant().toEpochMilli())
165+
.atZone(dateTimeZone).toOffsetDateTime();
166+
switch (subType) {
167+
case DATE:
168+
return dateFormatter.format(offsetDateTime);
169+
case TIME:
170+
return timeFormatter.format(offsetDateTime);
171+
case DATETIME:
172+
return dateTimeFormatter.format(offsetDateTime);
173+
default:
174+
throw BitSailException
175+
.asBitSailException(CommonErrorCode.CONVERT_NOT_SUPPORT, "");
176+
}
177+
} else {
178+
Object rawData = column.getRawData();
179+
switch (subType) {
180+
case LOCAL_DATE:
181+
return dateFormatter.format(((LocalDate) rawData).atStartOfDay().atZone(dateTimeZone));
182+
case LOCAL_TIME:
183+
return timeFormatter.format((LocalTime) rawData);
184+
case LOCAL_DATE_TIME:
185+
return dateTimeFormatter.format(((LocalDateTime) rawData).atZone(dateTimeZone));
186+
default:
187+
throw BitSailException
188+
.asBitSailException(CommonErrorCode.CONVERT_NOT_SUPPORT, "");
189+
}
140190
}
141191
}
142192

bitsail-common/src/main/java/com/bytedance/bitsail/common/column/DateColumn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.math.BigDecimal;
2323
import java.math.BigInteger;
24+
import java.sql.Time;
2425
import java.time.LocalDate;
2526
import java.time.LocalDateTime;
2627
import java.time.LocalTime;
@@ -102,6 +103,10 @@ public Date asDate() {
102103
return new Date(localDateTime.atZone(ZoneOffset.systemDefault())
103104
.toInstant().toEpochMilli());
104105
}
106+
if (getRawData() instanceof LocalTime) {
107+
LocalTime localTime = ((LocalTime) getRawData());
108+
return new Time(localTime.getHour(), localTime.getMinute(), localTime.getSecond());
109+
}
105110
return new Date((Long) this.getRawData());
106111
}
107112

bitsail-common/src/main/java/com/bytedance/bitsail/common/configuration/BitSailConfiguration.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,12 +763,14 @@ private <T> T findObjectByConfig(final String path, final ConfigOption<T> config
763763
result = getInt(path);
764764
break;
765765
case List:
766+
case ArrayList:
766767
result = getList(path);
767768
break;
768769
case Long:
769770
result = getLong(path);
770771
break;
771772
case Map:
773+
case HashMap:
772774
result = getMap(path);
773775
break;
774776
default:
@@ -935,6 +937,6 @@ private Map<String, String> flatten(Map.Entry<String, Object> in) {
935937

936938
private enum ConfigType {
937939
//Basic data type enumeration
938-
Boolean, Character, Double, Float, Integer, List, Long, Map, String
940+
Boolean, Character, Double, Float, Integer, List, Long, Map, String, ArrayList, HashMap
939941
}
940942
}

bitsail-common/src/main/java/com/bytedance/bitsail/common/option/CommonOptions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public interface CommonOptions {
6868

6969
ConfigOption<Integer> GLOBAL_PARALLELISM_NUM =
7070
key(COMMON_PREFIX + "global_parallelism_num")
71-
.defaultValue(-1);
71+
.defaultValue(-1)
72+
.withAlias("global_parallelism_num");
73+
7274

7375
/**
7476
* an optional option to store user-defined common parameters

bitsail-components/bitsail-flink-row-parser/src/main/java/com/bytedance/bitsail/batch/parser/row/KVRowBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import org.apache.flink.types.Row;
3333

34-
abstract class KVRowBuilder<T> implements RowBuilder<T> {
34+
public abstract class KVRowBuilder<T> implements RowBuilder<T> {
3535

3636
private ContentType contentType;
3737
private BytesParser bytesParser;

bitsail-components/bitsail-flink-row-parser/src/main/java/com/bytedance/bitsail/batch/parser/row/TextRowBuilder.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public void build(T value, Row reuse, RowTypeInfo rowTypeInfo, int[] fieldIndexe
6363
case BINARY:
6464
buildRowWithParser(value, reuse, rowTypeInfo, bytesParser);
6565
break;
66-
case CSV: buildRowWithCsvParser(value, reuse, StringUtils.EMPTY, rowTypeInfo, bytesParser, fieldIndexes);
66+
case CSV:
67+
buildRowWithCsvParser(value, reuse, rowTypeInfo, bytesParser, fieldIndexes);
6768
break;
6869
case PLAIN:
6970
buildPlainTextRow(value.toString(), reuse, rowTypeInfo);
@@ -87,11 +88,11 @@ private void buildRowWithParser(T value, Row reuse, RowTypeInfo rowTypeInfo, @No
8788
/**
8889
* text file is a csv file which needs a parser to parse
8990
*/
90-
private void buildRowWithCsvParser(T value, Row reuse, String mandatoryEncoding, RowTypeInfo rowTypeInfo, @NonNull BytesParser bytesParser,
91+
private void buildRowWithCsvParser(T value, Row reuse, RowTypeInfo rowTypeInfo, @NonNull BytesParser bytesParser,
9192
int[] fieldIndexes
9293
) throws BitSailException {
9394
try {
94-
((CsvBytesParser) bytesParser).parse(reuse, value, mandatoryEncoding, rowTypeInfo, fieldIndexes);
95+
((CsvBytesParser) bytesParser).parse(reuse, value, StringUtils.EMPTY, rowTypeInfo, fieldIndexes);
9596
} catch (Exception e) {
9697
throw BitSailException.asBitSailException(ParserErrorCode.ILLEGAL_TEXT, "value: " + value.toString(), e);
9798
}

bitsail-components/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@
3838
<module>bitsail-component-metrics</module>
3939
</modules>
4040

41+
<dependencyManagement>
42+
<dependencies>
43+
<dependency>
44+
<groupId>com.bytedance.bitsail</groupId>
45+
<artifactId>bitsail-base</artifactId>
46+
<version>${revision}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>com.bytedance.bitsail</groupId>
52+
<artifactId>bitsail-common</artifactId>
53+
<version>${revision}</version>
54+
<scope>provided</scope>
55+
</dependency>
56+
</dependencies>
57+
</dependencyManagement>
58+
4159
<dependencies>
4260
<dependency>
4361
<groupId>org.mockito</groupId>

bitsail-connectors/connector-clickhouse/src/main/java/com/bytedance/bitsail/connector/clickhouse/source/ClickhouseSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, B
9090
LOG.warn("Failed to compute splits for computing parallelism, will use default 1.");
9191
}
9292
}
93-
return new ParallelismAdvice(false, parallelism);
93+
return ParallelismAdvice.builder()
94+
.adviceParallelism(parallelism)
95+
.enforceDownStreamChain(false)
96+
.build();
9497
}
9598
}

bitsail-connectors/connector-druid/pom.xml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,49 @@
2929

3030
<properties>
3131
<druid.version>0.22.1</druid.version>
32+
<netty.version>4.1.29.Final</netty.version>
3233
</properties>
3334

35+
<dependencyManagement>
36+
<dependencies>
37+
<dependency>
38+
<groupId>io.netty</groupId>
39+
<artifactId>netty-resolver</artifactId>
40+
<version>${netty.version}</version>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>io.netty</groupId>
45+
<artifactId>netty-all</artifactId>
46+
<version>${netty.version}</version>
47+
</dependency>
48+
49+
<dependency>
50+
<groupId>io.netty</groupId>
51+
<artifactId>netty-common</artifactId>
52+
<version>${netty.version}</version>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>io.netty</groupId>
57+
<artifactId>netty-transport-native-unix-common</artifactId>
58+
<version>${netty.version}</version>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>io.netty</groupId>
63+
<artifactId>netty-transport</artifactId>
64+
<version>${netty.version}</version>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>io.netty</groupId>
69+
<artifactId>netty-buffer</artifactId>
70+
<version>${netty.version}</version>
71+
</dependency>
72+
</dependencies>
73+
</dependencyManagement>
74+
3475
<dependencies>
3576
<dependency>
3677
<groupId>com.bytedance.bitsail</groupId>

0 commit comments

Comments
 (0)