Skip to content

Commit 8127d2c

Browse files
authored
Fix issue with Integer Joda timestamp coercion in DeterminePartitionsJob (apache#18327)
Timestamp columns that can fit in 32bits are coerced to Integer, which causes Joda DateTime ctor to break.
1 parent 9a99cd8 commit 8127d2c

2 files changed

Lines changed: 26 additions & 9 deletions

File tree

indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,10 @@ protected void map(BytesWritable key, NullWritable value, Context context) throw
414414
{
415415
final List<Object> timeAndDims = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(key.getBytes(), List.class);
416416

417-
final DateTime timestamp = new DateTime(timeAndDims.get(0), ISOChronology.getInstanceUTC());
417+
final Object timestampObj = timeAndDims.get(0);
418+
final long longTimestamp = ((Number) timestampObj).longValue();
419+
final DateTime timestamp = new DateTime(longTimestamp, ISOChronology.getInstanceUTC());
420+
418421
final Map<String, Iterable<String>> dims = (Map<String, Iterable<String>>) timeAndDims.get(1);
419422

420423
helper.emitDimValueCounts(context, timestamp, dims);

indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class DeterminePartitionsJobTest
6565
@Parameterized.Parameters(name = "assumeGrouped={0}, "
6666
+ "targetRowsPerSegment={1}, "
6767
+ "maxRowsPerSegment={2}, "
68-
+ "interval={3}"
68+
+ "intervals={3}"
6969
+ "expectedNumOfSegments={4}, "
7070
+ "expectedNumOfShardsForEachSegment={5}, "
7171
+ "expectedStartEndForEachShard={6}, "
@@ -74,12 +74,26 @@ public static Collection<Object[]> constructFeed()
7474
{
7575
return Arrays.asList(
7676
new Object[][]{
77+
{
78+
false,
79+
1,
80+
NO_MAX_ROWS_PER_SEGMENT,
81+
List.of("1970-01-01T00:00:00Z/P1D"),
82+
1,
83+
new int[]{1},
84+
new String[][][]{
85+
{
86+
{null, null}
87+
}
88+
},
89+
ImmutableList.of("1970010100,c.example.com,CN,100")
90+
},
7791
{
7892
// Test partitoning by targetRowsPerSegment
7993
true,
8094
2,
8195
NO_MAX_ROWS_PER_SEGMENT,
82-
"2014-10-22T00:00:00Z/P1D",
96+
List.of("2014-10-22T00:00:00Z/P1D"),
8397
1,
8498
new int[]{5},
8599
new String[][][]{
@@ -108,7 +122,7 @@ public static Collection<Object[]> constructFeed()
108122
true,
109123
NO_TARGET_ROWS_PER_SEGMENT,
110124
2,
111-
"2014-10-22T00:00:00Z/P1D",
125+
List.of("2014-10-22T00:00:00Z/P1D"),
112126
1,
113127
new int[]{5},
114128
new String[][][]{
@@ -137,7 +151,7 @@ public static Collection<Object[]> constructFeed()
137151
false,
138152
NO_TARGET_ROWS_PER_SEGMENT,
139153
2,
140-
"2014-10-20T00:00:00Z/P1D",
154+
List.of("2014-10-20T00:00:00Z/P1D"),
141155
1,
142156
new int[]{5},
143157
new String[][][]{
@@ -176,7 +190,7 @@ public static Collection<Object[]> constructFeed()
176190
true,
177191
NO_TARGET_ROWS_PER_SEGMENT,
178192
5,
179-
"2014-10-20T00:00:00Z/P3D",
193+
List.of("2014-10-20T00:00:00Z/P3D"),
180194
3,
181195
new int[]{2, 2, 2},
182196
new String[][][]{
@@ -230,7 +244,7 @@ public static Collection<Object[]> constructFeed()
230244
true,
231245
NO_TARGET_ROWS_PER_SEGMENT,
232246
1000,
233-
"2014-10-22T00:00:00Z/P1D",
247+
List.of("2014-10-22T00:00:00Z/P1D"),
234248
1,
235249
new int[]{1},
236250
new String[][][]{
@@ -259,7 +273,7 @@ public DeterminePartitionsJobTest(
259273
boolean assumeGrouped,
260274
@Nullable Integer targetRowsPerSegment,
261275
Integer maxRowsPerSegment,
262-
String interval,
276+
List<String> intervals,
263277
int expectedNumOfSegments,
264278
int[] expectedNumOfShardsForEachSegment,
265279
String[][][] expectedStartEndForEachShard,
@@ -304,7 +318,7 @@ public DeterminePartitionsJobTest(
304318
new UniformGranularitySpec(
305319
Granularities.DAY,
306320
Granularities.NONE,
307-
ImmutableList.of(Intervals.of(interval))
321+
intervals.stream().map(Intervals::of).collect(ImmutableList.toImmutableList())
308322
)
309323
)
310324
.withObjectMapper(HadoopDruidIndexerConfig.JSON_MAPPER)

0 commit comments

Comments
 (0)