|
1 | 1 | /*
|
2 |
| - * Copyright (C) 2019 Can Elmas <[email protected]> |
| 2 | + * Copyright (C) 2020 Can Elmas <[email protected]> |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
16 | 16 |
|
17 | 17 | package com.canelmas.kafka.connect;
|
18 | 18 |
|
| 19 | +import io.confluent.connect.storage.common.StorageCommonConfig; |
19 | 20 | import io.confluent.connect.storage.errors.PartitionException;
|
| 21 | +import io.confluent.connect.storage.partitioner.PartitionerConfig; |
20 | 22 | import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
|
21 |
| -import io.confluent.connect.storage.partitioner.TimestampExtractor; |
22 | 23 | import io.confluent.connect.storage.util.DataUtils;
|
23 |
| -import org.apache.kafka.common.config.ConfigException; |
24 | 24 | import org.apache.kafka.connect.connector.ConnectRecord;
|
25 |
| -import org.apache.kafka.connect.data.Schema; |
26 | 25 | import org.apache.kafka.connect.data.Struct;
|
27 |
| -import org.apache.kafka.connect.errors.ConnectException; |
28 | 26 | import org.apache.kafka.connect.sink.SinkRecord;
|
29 |
| -import org.joda.time.DateTime; |
30 | 27 | import org.joda.time.DateTimeZone;
|
31 |
| -import org.joda.time.format.DateTimeFormat; |
32 |
| -import org.joda.time.format.DateTimeFormatter; |
33 | 28 | import org.slf4j.Logger;
|
34 | 29 | import org.slf4j.LoggerFactory;
|
35 | 30 |
|
| 31 | +import java.util.List; |
36 | 32 | import java.util.Locale;
|
37 | 33 | import java.util.Map;
|
38 | 34 |
|
39 | 35 | public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {
|
40 | 36 |
|
| 37 | + public static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path"; |
| 38 | + public static final String PARTITION_FIELD_FORMAT_PATH_DOC = "Whether directory labels should be included when partitioning for custom fields e.g. " + |
| 39 | + "whether this 'orgId=XXXX/appId=ZZZZ/customField=YYYY' or this 'XXXX/ZZZZ/YYYY'."; |
| 40 | + public static final String PARTITION_FIELD_FORMAT_PATH_DISPLAY = "Partition Field Format Path"; |
| 41 | + public static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true; |
41 | 42 | private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
|
42 |
| - |
43 |
| - private long partitionDurationMs; |
44 |
| - private DateTimeFormatter formatter; |
45 |
| - private TimestampExtractor timestampExtractor; |
46 |
| - |
47 | 43 | private PartitionFieldExtractor partitionFieldExtractor;
|
48 | 44 |
|
49 | 45 | protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
|
| 46 | + super.init(partitionDurationMs, pathFormat, locale, timeZone, config); |
50 | 47 |
|
51 |
| - this.delim = (String)config.get("directory.delim"); |
52 |
| - this.partitionDurationMs = partitionDurationMs; |
53 |
| - |
54 |
| - try { |
55 |
| - |
56 |
| - this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale); |
57 |
| - this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor")); |
58 |
| - this.timestampExtractor.configure(config); |
59 |
| - |
60 |
| - this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field")); |
61 |
| - |
62 |
| - } catch (IllegalArgumentException e) { |
63 |
| - |
64 |
| - ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage()); |
65 |
| - ce.initCause(e); |
66 |
| - throw ce; |
67 |
| - |
68 |
| - } |
69 |
| - } |
70 |
| - |
71 |
| - private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) { |
72 |
| - return DateTimeFormat.forPattern(str).withZone(timeZone); |
73 |
| - } |
74 |
| - |
75 |
| - public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) { |
76 |
| - |
77 |
| - long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp); |
78 |
| - long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs; |
| 48 | + final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG); |
| 49 | + final boolean formatPath = (Boolean) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT); |
79 | 50 |
|
80 |
| - return timeZone.convertLocalToUTC(partitionedTime, false); |
81 |
| - |
| 51 | + this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath); |
82 | 52 | }
|
83 |
| - |
84 |
| - public String encodePartition(SinkRecord sinkRecord, long nowInMillis) { |
85 | 53 |
|
86 |
| - final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis); |
87 |
| - final String partitionField = this.partitionFieldExtractor.extract(sinkRecord); |
| 54 | + public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) { |
| 55 | + final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis); |
| 56 | + final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord); |
| 57 | + final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp); |
88 | 58 |
|
89 |
| - return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField); |
| 59 | + log.info("Encoded partition : {}", partition); |
90 | 60 |
|
| 61 | + return partition; |
91 | 62 | }
|
92 | 63 |
|
93 |
| - public String encodePartition(SinkRecord sinkRecord) { |
| 64 | + public String encodePartition(final SinkRecord sinkRecord) { |
| 65 | + final String partitionsForTimestamp = super.encodePartition(sinkRecord); |
| 66 | + final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord); |
| 67 | + final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp); |
94 | 68 |
|
95 |
| - final Long timestamp = this.timestampExtractor.extract(sinkRecord); |
96 |
| - final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord); |
97 |
| - |
98 |
| - return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue); |
| 69 | + log.info("Encoded partition : {}", partition); |
99 | 70 |
|
| 71 | + return partition; |
100 | 72 | }
|
101 | 73 |
|
102 |
| - private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) { |
103 |
| - |
104 |
| - if (timestamp == null) { |
105 |
| - |
106 |
| - final String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord; |
107 |
| - log.error(msg); |
108 |
| - throw new ConnectException(msg); |
109 |
| - |
110 |
| - } else if (partitionField == null) { |
| 74 | + public static class PartitionFieldExtractor { |
111 | 75 |
|
112 |
| - final String msg = "Unable to determine partition field using partition.field '" + partitionField + "' for record: " + sinkRecord; |
113 |
| - log.error(msg); |
114 |
| - throw new ConnectException(msg); |
| 76 | + private static final String DELIMITER_EQ = "="; |
115 | 77 |
|
116 |
| - } else { |
| 78 | + private final boolean formatPath; |
| 79 | + private final List<String> fieldNames; |
117 | 80 |
|
118 |
| - final DateTime bucket = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone())); |
119 |
| - return partitionField + this.delim + bucket.toString(this.formatter); |
120 |
| - |
| 81 | + PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) { |
| 82 | + this.fieldNames = fieldNames; |
| 83 | + this.formatPath = formatPath; |
121 | 84 | }
|
122 |
| - } |
123 |
| - |
124 |
| - static class PartitionFieldExtractor { |
125 | 85 |
|
126 |
| - private final String fieldName; |
| 86 | + public String extract(final ConnectRecord<?> record) { |
127 | 87 |
|
128 |
| - PartitionFieldExtractor(String fieldName) { |
129 |
| - this.fieldName = fieldName; |
130 |
| - } |
| 88 | + final Object value = record.value(); |
131 | 89 |
|
132 |
| - String extract(ConnectRecord<?> record) { |
| 90 | + final StringBuilder builder = new StringBuilder(); |
133 | 91 |
|
134 |
| - Object value = record.value(); |
| 92 | + for (final String fieldName : this.fieldNames) { |
135 | 93 |
|
136 |
| - if (value instanceof Struct) { |
| 94 | + if (builder.length() != 0) { |
| 95 | + builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); |
| 96 | + } |
137 | 97 |
|
138 |
| - final Object field = DataUtils.getNestedFieldValue(value, fieldName); |
139 |
| - final Schema fieldSchema = DataUtils.getNestedField(record.valueSchema(), fieldName).schema(); |
| 98 | + if (value instanceof Struct || value instanceof Map) { |
140 | 99 |
|
141 |
| - FieldAndTimeBasedPartitioner.log.error("Unsupported type '{}' for partition field.", fieldSchema.type().getName()); |
| 100 | + final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName); |
142 | 101 |
|
143 |
| - return (String) field; |
| 102 | + if (formatPath) { |
| 103 | + builder.append(String.join(DELIMITER_EQ, fieldName, partitionField)); |
| 104 | + } else { |
| 105 | + builder.append(partitionField); |
| 106 | + } |
| 107 | + |
| 108 | + } else { |
| 109 | + log.error("Value is not of Struct or Map type."); |
| 110 | + throw new PartitionException("Error encoding partition."); |
| 111 | + } |
144 | 112 |
|
145 |
| - } else if (value instanceof Map) { |
146 |
| - |
147 |
| - return (String) DataUtils.getNestedFieldValue(value, fieldName); |
| 113 | + } |
148 | 114 |
|
149 |
| - } else { |
| 115 | + return builder.toString(); |
150 | 116 |
|
151 |
| - FieldAndTimeBasedPartitioner.log.error("Value is not of Struct or Map type."); |
152 |
| - throw new PartitionException("Error encoding partition."); |
153 |
| - |
154 |
| - } |
155 | 117 | }
|
156 | 118 | }
|
157 | 119 |
|
158 |
| - @Override |
159 |
| - public long getPartitionDurationMs() { |
160 |
| - return partitionDurationMs; |
161 |
| - } |
162 |
| - |
163 |
| - @Override |
164 |
| - public TimestampExtractor getTimestampExtractor() { |
165 |
| - return timestampExtractor; |
166 |
| - } |
167 | 120 | }
|
0 commit comments