Skip to content

Commit dc5dece

Browse files
author
Mike Skells
committed
review comments
Make TimestampSource a builder typos and header comments
1 parent c873b5a commit dc5dece

File tree

13 files changed

+276
-150
lines changed

13 files changed

+276
-150
lines changed

commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,10 @@ public final ZoneId getFilenameTimezone() {
202202
}
203203

204204
public final TimestampSource getFilenameTimestampSource() {
205-
return TimestampSource.forConfiguration(getString(FILE_NAME_TIMESTAMP_SOURCE), getFilenameTimezone());
205+
return new TimestampSource.Builder()
206+
.configuration(getString(FILE_NAME_TIMESTAMP_SOURCE))
207+
.zoneId(getFilenameTimezone())
208+
.build();
206209
}
207210

208211
public final int getMaxRecordsPerFile() {

commons/src/main/java/io/aiven/kafka/connect/common/config/Path.java

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.aiven.kafka.connect.common.config;
17+
18+
19+
import org.apache.kafka.connect.data.Field;
20+
import org.apache.kafka.connect.data.Schema;
21+
import org.apache.kafka.connect.data.Struct;
22+
import org.apache.kafka.connect.sink.SinkRecord;
23+
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.regex.Pattern;
28+
29+
public final class PathAccess {
30+
private final String[] terms;
31+
32+
private PathAccess(final String[] terms) {
33+
this.terms = terms;
34+
}
35+
36+
/**
37+
* Parse a path definition string into a Path object. The path definition string is a '.' separated series of
38+
* strings, which are the terms in the path If the '.' is something that should be included in the terms, and you
39+
* want to use a different separator, then you can specify a '.' as the first character, and the separator as the
40+
* second character, and then the path is the rest of the string For example "a.b.c" would parse into a path with
41+
* terms "a", "b", "c" For example ".:a.b:c" would parse into a path with terms "a.b", "c"
42+
*
43+
* @return a PathAccess that can access a value in a nested structure
44+
*/
45+
public static PathAccess parse(final String pathDefinition) {
46+
final String pathDescription;
47+
final String pathSeparator;
48+
if (pathDefinition.length() > 1 && pathDefinition.charAt(0) == '.' ) {
49+
pathDescription = pathDefinition.substring(2);
50+
pathSeparator = pathDefinition.substring(1,2);
51+
} else {
52+
pathDescription = pathDefinition;
53+
pathSeparator = ".";
54+
}
55+
return new PathAccess(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription));
56+
}
57+
58+
public Object extractDataFrom(final SinkRecord value) {
59+
Object current = value;
60+
61+
for (final String term : terms) {
62+
if (current == null) {
63+
return null;
64+
}
65+
if (current instanceof Struct) {
66+
final Struct struct = (Struct) current;
67+
final Schema schema = struct.schema();
68+
final Field field = schema.field(term);
69+
if (field == null) {
70+
return null;
71+
}
72+
current = struct.get(field);
73+
}
74+
if (current instanceof Map) {
75+
current = ((Map<?, ?>) current).get(term);
76+
} else if (current instanceof List) {
77+
try {
78+
current = ((List<?>) current).get(Integer.parseInt(term));
79+
} catch (NumberFormatException|IndexOutOfBoundsException e) {
80+
return null;
81+
}
82+
} else {
83+
return null;
84+
}
85+
}
86+
return current;
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "PathAccess[terms=" + Arrays.toString( terms) +"]";
92+
}
93+
}

commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
import java.time.Instant;
2020
import java.time.ZoneId;
21+
import java.time.ZoneOffset;
2122
import java.time.ZonedDateTime;
23+
import java.util.Locale;
24+
import java.util.Objects;
2225

2326
import org.apache.kafka.connect.header.Header;
2427
import org.apache.kafka.connect.sink.SinkRecord;
@@ -39,58 +42,67 @@ enum Type {
3942
CUSTOM
4043

4144
}
45+
class Builder {
46+
private ZoneId zoneId = ZoneOffset.UTC;
47+
private Type type;
48+
private String additionalParameters;
4249

43-
static TimestampSource forConfiguration(final String configuration, final ZoneId zoneId) {
44-
final String[] parts = configuration.split(":", 2);
45-
final String typeName = parts[0];
46-
final String context = parts.length > 1 ? parts[1] : null;
47-
for (final Type t : Type.values()) {
48-
if (t.name().equalsIgnoreCase(typeName)) {
49-
return create(t, context, zoneId);
50-
}
50+
public Builder zoneId(final ZoneId zoneId) {
51+
Objects.requireNonNull(zoneId, "zoneId cannot be null");
52+
this.zoneId = zoneId;
53+
return this;
5154
}
52-
throw new IllegalArgumentException(String.format("Unknown timestamp source: %s", configuration));
53-
}
5455

55-
private static TimestampSource create(Type t, String context, ZoneId zoneId) {
56-
switch (t) {
57-
case WALLCLOCK:
58-
if (context != null) {
59-
throw new IllegalArgumentException("Wallclock timestamp source does not support context");
60-
}
61-
return new WallclockTimestampSource(zoneId);
62-
case EVENT:
63-
if (context != null) {
64-
throw new IllegalArgumentException("Event timestamp source does not support context");
65-
}
66-
return new EventTimestampSource(zoneId);
67-
case DATA:
68-
if (context == null) {
69-
throw new IllegalArgumentException("Data timestamp source requires context");
70-
}
71-
return new DataTimestampSource(context, zoneId);
72-
case HEADER:
73-
if (context == null) {
74-
throw new IllegalArgumentException("Header timestamp source requires context");
75-
}
76-
return new HeaderTimestampSource(context, zoneId);
77-
case CUSTOM:
78-
if (context == null) {
79-
throw new IllegalArgumentException("Header timestamp source requires context");
80-
}
81-
final String[] parts = context.split(":", 2);
82-
final String className = parts[0];
83-
final String params = parts.length > 1 ? parts[1] : null;
84-
try {
85-
final Class<?> clazz = Class.forName(className);
86-
return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId);
87-
} catch (final Exception e) {
88-
throw new IllegalArgumentException("Failed to create custom timestamp source", e);
89-
}
90-
default:
91-
throw new IllegalArgumentException(
92-
String.format("Unsupported timestamp extractor type: %s", t));
56+
public Builder configuration(final String configuration) {
57+
final String[] parts = configuration.split(":", 2);
58+
final String typeName = parts[0];
59+
this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
60+
61+
this.additionalParameters = parts.length > 1 ? parts[1] : null;
62+
return this;
9363
}
64+
65+
public TimestampSource build() {
66+
switch (type) {
67+
case WALLCLOCK:
68+
if (additionalParameters != null) {
69+
throw new IllegalArgumentException("Wallclock timestamp source does not support additionalParameters");
70+
}
71+
return new WallclockTimestampSource(zoneId);
72+
case EVENT:
73+
if (additionalParameters != null) {
74+
throw new IllegalArgumentException("Event timestamp source does not support additionalParameters");
75+
}
76+
return new EventTimestampSource(zoneId);
77+
case DATA:
78+
if (additionalParameters == null) {
79+
throw new IllegalArgumentException("Data timestamp source requires additionalParameters");
80+
}
81+
return new DataTimestampSource(additionalParameters, zoneId);
82+
case HEADER:
83+
if (additionalParameters == null) {
84+
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
85+
}
86+
return new HeaderTimestampSource(additionalParameters, zoneId);
87+
case CUSTOM:
88+
if (additionalParameters == null) {
89+
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
90+
}
91+
final String[] parts = additionalParameters.split(":", 2);
92+
final String className = parts[0];
93+
final String params = parts.length > 1 ? parts[1] : null;
94+
try {
95+
final Class<?> clazz = Class.forName(className);
96+
return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId);
97+
} catch (final Exception e) {
98+
throw new IllegalArgumentException("Failed to create custom timestamp source", e);
99+
}
100+
default:
101+
throw new IllegalArgumentException(
102+
String.format("Unsupported timestamp extractor type: %s", type));
103+
}
104+
}
105+
94106
}
95107

96108
abstract class AbstractTimestampSource implements TimestampSource {
@@ -148,16 +160,16 @@ public ZonedDateTime time(final SinkRecord record) {
148160
}
149161

150162
final class DataTimestampSource extends AbstractTimestampSource {
151-
private final Path path;
163+
private final PathAccess path;
152164

153165
DataTimestampSource(final String pathDefinition, final ZoneId zoneId) {
154166
super(zoneId, Type.DATA);
155-
this.path = Path.parse(pathDefinition);
167+
this.path = PathAccess.parse(pathDefinition);
156168
}
157169

158170
@Override
159171
public ZonedDateTime time(final SinkRecord record) {
160-
Object value = path.extractDataFrom(record);
172+
final Object value = path.extractDataFrom(record);
161173
return rawTime(value);
162174
}
163175

@@ -173,7 +185,7 @@ final class HeaderTimestampSource extends AbstractTimestampSource {
173185

174186
@Override
175187
public ZonedDateTime time(final SinkRecord record) {
176-
Header header = record.headers().lastWithName(headerName);
188+
final Header header = record.headers().lastWithName(headerName);
177189
return rawTime(header == null ? null : header.value());
178190
}
179191
}

commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121

2222
import io.aiven.kafka.connect.common.config.TimestampSource;
2323

24-
import java.time.ZoneOffset;
25-
2624
public class TimestampSourceValidator implements ConfigDef.Validator {
2725

2826
@Override
2927
public void ensureValid(final String name, final Object value) {
3028
try {
31-
TimestampSource.forConfiguration(value.toString(), ZoneOffset.UTC);
29+
new TimestampSource.Builder()
30+
.configuration(value.toString())
31+
.build();
3232
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
3333
throw new ConfigException(name, value, e.getMessage());
3434
}

0 commit comments

Comments
 (0)