Skip to content

Commit ea0cd33

Browse files
author
Mike Skells
committed
review comments
Remove a few simple classes and make a DataExtractor to read things from the `sinkRecord` and few tidyups
1 parent dc5dece commit ea0cd33

File tree

7 files changed

+296
-109
lines changed

7 files changed

+296
-109
lines changed

commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java

Lines changed: 66 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Locale;
2424
import java.util.Objects;
2525

26-
import org.apache.kafka.connect.header.Header;
26+
import io.aiven.kafka.connect.common.config.extractors.DataExtractor;
27+
import io.aiven.kafka.connect.common.config.extractors.HeaderValueExtractor;
28+
import io.aiven.kafka.connect.common.config.extractors.SimpleValuePath;
2729
import org.apache.kafka.connect.sink.SinkRecord;
2830

2931
public interface TimestampSource {
@@ -38,7 +40,7 @@ enum Type {
3840
WALLCLOCK,
3941
EVENT,
4042
HEADER,
41-
DATA,
43+
SIMPLE_DATA,
4244
CUSTOM
4345

4446
}
@@ -47,16 +49,56 @@ class Builder {
4749
private Type type;
4850
private String additionalParameters;
4951

52+
/**
53+
* set the zoneId to be used. If this method isnt called, the default is UTC
54+
* @return this
55+
* @throws NullPointerException if zoneId is null
56+
*/
5057
public Builder zoneId(final ZoneId zoneId) {
5158
Objects.requireNonNull(zoneId, "zoneId cannot be null");
5259
this.zoneId = zoneId;
5360
return this;
5461
}
5562

63+
/**
64+
* sets the type of the timestamp source and associated parameters (if needed)
65+
* The format of the configuration is <type>[:<data>]
66+
* i.e. the type name, optionally followed by data.
67+
* <br>
68+
* The data is type specific
69+
* <p>
70+
* For type WALLCLOCK or EVENT, no data is allowed
71+
* </p>
72+
* <p>
73+
* For type SIMPLE_DATA, data is required, and is a '.' separated series of
74+
* terms in the path
75+
* <br>If the '.' is something that should be included in the terms, and you
76+
* want to use a different separator, then you can specify a '.' as the first character, and the separator as the
77+
* second character, and then the path is the rest of the string
78+
* <br>For example "SIMPLE_DATA:a.b.c" would use into a path with
79+
* terms "a", "b", "c"
80+
* <br>For example "SIMPLE_DATA:.:a.b:c" would use a path with terms "a.b", "c"
81+
* </p>
82+
* For type HEADER, data is required, and is the name of the header to extract
83+
* <br>For example "HEADER:foo" would use to "foo" header (or null if its not available in the SinkRecord
84+
* </p>
85+
* </p>
86+
* For type CUSTOM, data is required, and is the name of the class to use, and any additional parameters for that custom time source.
87+
* The specified class must implement the TimestampSource interface and have a public constructor that takes a String and a ZoneId. Fort the meaning of the data, see the documentation of the custom class.
88+
* <br>For example "CUSTOM:my.custom.timesource:some more data" would be similar to calling new my.custom.timesource("some more data", zoneId)
89+
* </p>
90+
*
91+
92+
* @return this
93+
*/
5694
public Builder configuration(final String configuration) {
5795
final String[] parts = configuration.split(":", 2);
5896
final String typeName = parts[0];
59-
this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
97+
try {
98+
this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
99+
} catch (final IllegalArgumentException e) {
100+
throw new IllegalArgumentException("Unknown timestamp source: "+typeName);
101+
}
60102

61103
this.additionalParameters = parts.length > 1 ? parts[1] : null;
62104
return this;
@@ -74,16 +116,16 @@ public TimestampSource build() {
74116
throw new IllegalArgumentException("Event timestamp source does not support additionalParameters");
75117
}
76118
return new EventTimestampSource(zoneId);
77-
case DATA:
119+
case SIMPLE_DATA:
78120
if (additionalParameters == null) {
79121
throw new IllegalArgumentException("Data timestamp source requires additionalParameters");
80122
}
81-
return new DataTimestampSource(additionalParameters, zoneId);
123+
return new SimpleTimestampSource(zoneId, Type.SIMPLE_DATA, SimpleValuePath.parse(additionalParameters));
82124
case HEADER:
83125
if (additionalParameters == null) {
84126
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
85127
}
86-
return new HeaderTimestampSource(additionalParameters, zoneId);
128+
return new SimpleTimestampSource(zoneId, Type.HEADER, new HeaderValueExtractor(additionalParameters));
87129
case CUSTOM:
88130
if (additionalParameters == null) {
89131
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
@@ -105,41 +147,48 @@ public TimestampSource build() {
105147

106148
}
107149

108-
abstract class AbstractTimestampSource implements TimestampSource {
150+
class SimpleTimestampSource implements TimestampSource {
109151
protected final ZoneId zoneId;
110152
private final Type type;
153+
private final DataExtractor dataExtractor;
111154

112-
protected AbstractTimestampSource(final ZoneId zoneId, final Type type) {
155+
protected SimpleTimestampSource(final ZoneId zoneId, final Type type, DataExtractor dataExtractor) {
113156
this.zoneId = zoneId;
114157
this.type = type;
158+
this.dataExtractor = dataExtractor;
115159
}
116160

117161
@Override
118162
public Type type() {
119163
return type;
120164
}
121165

122-
protected ZonedDateTime rawTime(final Object rawValue) {
166+
@Override
167+
public ZonedDateTime time(SinkRecord record) {
168+
return fromRawTime(dataExtractor.extractDataFrom(record));
169+
}
170+
171+
protected ZonedDateTime fromRawTime(final Object rawValue) {
123172
if (rawValue == null) {
124173
return null;
125174
} else if (rawValue instanceof Long) {
126-
return time((Long) rawValue);
175+
return withZone((Long) rawValue);
127176
} else if (rawValue instanceof ZonedDateTime) {
128177
return (ZonedDateTime) rawValue;
129178
} else if (rawValue instanceof Instant) {
130-
return time(((Instant) rawValue).toEpochMilli());
179+
return withZone(((Instant) rawValue).toEpochMilli());
131180
}
132181
return null;
133182
}
134183

135-
protected ZonedDateTime time(final long timestamp) {
184+
protected ZonedDateTime withZone(final long timestamp) {
136185
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId);
137186
}
138187
}
139188

140-
final class WallclockTimestampSource extends AbstractTimestampSource {
189+
final class WallclockTimestampSource extends SimpleTimestampSource {
141190
WallclockTimestampSource(final ZoneId zoneId) {
142-
super(zoneId, Type.WALLCLOCK);
191+
super(zoneId, Type.WALLCLOCK, null);
143192
}
144193

145194
@Override
@@ -148,45 +197,14 @@ public ZonedDateTime time(final SinkRecord record) {
148197
}
149198
}
150199

151-
final class EventTimestampSource extends AbstractTimestampSource {
200+
final class EventTimestampSource extends SimpleTimestampSource {
152201
EventTimestampSource(final ZoneId zoneId) {
153-
super(zoneId, Type.EVENT);
154-
}
155-
156-
@Override
157-
public ZonedDateTime time(final SinkRecord record) {
158-
return time(record.timestamp());
159-
}
160-
}
161-
162-
final class DataTimestampSource extends AbstractTimestampSource {
163-
private final PathAccess path;
164-
165-
DataTimestampSource(final String pathDefinition, final ZoneId zoneId) {
166-
super(zoneId, Type.DATA);
167-
this.path = PathAccess.parse(pathDefinition);
168-
}
169-
170-
@Override
171-
public ZonedDateTime time(final SinkRecord record) {
172-
final Object value = path.extractDataFrom(record);
173-
return rawTime(value);
174-
}
175-
176-
}
177-
178-
final class HeaderTimestampSource extends AbstractTimestampSource {
179-
private final String headerName;
180-
181-
HeaderTimestampSource(final String headerName, final ZoneId zoneId) {
182-
super(zoneId, Type.HEADER);
183-
this.headerName = headerName;
202+
super(zoneId, Type.EVENT, null);
184203
}
185204

186205
@Override
187206
public ZonedDateTime time(final SinkRecord record) {
188-
final Header header = record.headers().lastWithName(headerName);
189-
return rawTime(header == null ? null : header.value());
207+
return withZone(record.timestamp());
190208
}
191209
}
192210
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.aiven.kafka.connect.common.config.extractors;
2+
3+
import org.apache.kafka.connect.sink.SinkRecord;
4+
5+
public interface DataExtractor {
6+
7+
Object extractDataFrom(final SinkRecord record);
8+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.aiven.kafka.connect.common.config.extractors;
2+
3+
import org.apache.kafka.connect.header.Header;
4+
import org.apache.kafka.connect.sink.SinkRecord;
5+
6+
public class HeaderValueExtractor implements DataExtractor {
7+
private final String headerKey;
8+
9+
public HeaderValueExtractor(final String headerKey) {
10+
this.headerKey = headerKey;
11+
}
12+
13+
public Object extractDataFrom(final SinkRecord record) {
14+
final Header header = record.headers().lastWithName(headerKey);
15+
return header == null ? null : header.value();
16+
}
17+
}

commons/src/main/java/io/aiven/kafka/connect/common/config/PathAccess.java renamed to commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.aiven.kafka.connect.common.config;
16+
package io.aiven.kafka.connect.common.config.extractors;
1717

1818

1919
import org.apache.kafka.connect.data.Field;
@@ -26,10 +26,10 @@
2626
import java.util.Map;
2727
import java.util.regex.Pattern;
2828

29-
public final class PathAccess {
29+
public final class SimpleValuePath implements DataExtractor {
3030
private final String[] terms;
3131

32-
private PathAccess(final String[] terms) {
32+
private SimpleValuePath(final String[] terms) {
3333
this.terms = terms;
3434
}
3535

@@ -42,7 +42,7 @@ private PathAccess(final String[] terms) {
4242
*
4343
* @return a PathAccess that can access a value in a nested structure
4444
*/
45-
public static PathAccess parse(final String pathDefinition) {
45+
public static SimpleValuePath parse(final String pathDefinition) {
4646
final String pathDescription;
4747
final String pathSeparator;
4848
if (pathDefinition.length() > 1 && pathDefinition.charAt(0) == '.' ) {
@@ -52,11 +52,11 @@ public static PathAccess parse(final String pathDefinition) {
5252
pathDescription = pathDefinition;
5353
pathSeparator = ".";
5454
}
55-
return new PathAccess(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription));
55+
return new SimpleValuePath(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription));
5656
}
5757

58-
public Object extractDataFrom(final SinkRecord value) {
59-
Object current = value;
58+
public Object extractDataFrom(final SinkRecord record) {
59+
Object current = record.value();
6060

6161
for (final String term : terms) {
6262
if (current == null) {
@@ -70,8 +70,7 @@ public Object extractDataFrom(final SinkRecord value) {
7070
return null;
7171
}
7272
current = struct.get(field);
73-
}
74-
if (current instanceof Map) {
73+
} else if (current instanceof Map) {
7574
current = ((Map<?, ?>) current).get(term);
7675
} else if (current instanceof List) {
7776
try {
@@ -88,6 +87,6 @@ public Object extractDataFrom(final SinkRecord value) {
8887

8988
@Override
9089
public String toString() {
91-
return "PathAccess[terms=" + Arrays.toString( terms) +"]";
90+
return "Path[terms=" + Arrays.toString( terms) +"]";
9291
}
9392
}

commons/src/test/java/io/aiven/kafka/connect/common/config/PathAccessTest.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)