Skip to content

Commit e889201

Browse files
authored
Fix Hadoop multi-value string null value handling to match native batch (#18944)
Doing some more digging, I found another unfortunate data difference between native batch (on-cluster) and Hadoop batch ingest. Ingesting a multi-value string ["a","b",null] with Hadoop is treated as ["a","b","null"] and in native batch, this correctly ingests to ["a","b",null]. This is difference appears to be a bug in all Druid versions(even latest). While this will not affect the current null handling migration, this will affect the future Hadoop -> native batch ingestion migration that will also need to take place. Hadoop doesn't allow for all-null columns in segments, it simply excludes them from the segment. I've updated the Hadoop job to support running druid.indexer.task.storeEmptyColumns=true, which allows us to store all NULL columns (how native/streaming ingest work today). BREAKING CHANGES 1. Hadoop ingests will now process multi-value string inputs like ["a","b",null] -> ["a","b",null] instead of ["a","b","null"] to match native batch ingestion. 2. Hadoop ingests will now by default keep columns with all NULL values, instead of excluding them from the segment. useStringValueOfNullInLists parameter in RowBasedColumnSelectorFactory.java‎ has been removed.
1 parent cec4f5b commit e889201

28 files changed

Lines changed: 728 additions & 61 deletions

File tree

extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
4646
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
4747
import org.apache.druid.java.util.common.parsers.ParseException;
48+
import org.apache.druid.math.expr.Evals;
4849
import org.junit.Assert;
4950
import org.junit.Before;
5051
import org.junit.Test;
@@ -71,6 +72,7 @@
7172
import java.util.List;
7273
import java.util.Map;
7374
import java.util.regex.Pattern;
75+
import java.util.stream.Collectors;
7476

7577
public class AvroStreamInputRowParserTest
7678
{
@@ -366,8 +368,9 @@ static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimens
366368
Lists.transform(SOME_INT_ARRAY_VALUE, String::valueOf),
367369
inputRow.getDimension("someIntArray")
368370
);
371+
// For string array, nulls are preserved so use ArrayList (ImmutableList doesn't support nulls)
369372
Assert.assertEquals(
370-
Lists.transform(SOME_STRING_ARRAY_VALUE, String::valueOf),
373+
SOME_STRING_ARRAY_VALUE.stream().map(Evals::asString).collect(Collectors.toList()),
371374
inputRow.getDimension("someStringArray")
372375
);
373376

indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
import org.apache.druid.java.util.common.jackson.JacksonUtils;
5353
import org.apache.druid.segment.IndexIO;
5454
import org.apache.druid.segment.IndexMerger;
55-
import org.apache.druid.segment.IndexMergerV9;
55+
import org.apache.druid.segment.IndexMergerV10Factory;
56+
import org.apache.druid.segment.IndexMergerV9Factory;
5657
import org.apache.druid.segment.IndexSpec;
5758
import org.apache.druid.segment.loading.DataSegmentPusher;
5859
import org.apache.druid.server.DruidNode;
@@ -88,12 +89,14 @@ public class HadoopDruidIndexerConfig
8889
private static final Injector INJECTOR;
8990

9091
static final String CONFIG_PROPERTY = "druid.indexer.config";
92+
private static final String STORE_EMPTY_COLUMNS_KEY = "druid.indexer.task.storeEmptyColumns";
93+
private static final String BUILD_V10_KEY = "druid.indexer.task.buildV10";
9194
static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
9295
static final Splitter TAB_SPLITTER = Splitter.on("\t");
9396
static final Joiner TAB_JOINER = Joiner.on("\t");
9497
public static final ObjectMapper JSON_MAPPER;
9598
public static final IndexIO INDEX_IO;
96-
static final IndexMerger INDEX_MERGER_V9; // storeEmptyColumns is off for this indexMerger
99+
static final IndexMerger INDEX_MERGER;
97100
static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
98101
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
99102
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
@@ -129,10 +132,17 @@ public class HadoopDruidIndexerConfig
129132
);
130133
JSON_MAPPER = INJECTOR.getInstance(ObjectMapper.class);
131134
INDEX_IO = INJECTOR.getInstance(IndexIO.class);
132-
INDEX_MERGER_V9 = INJECTOR.getInstance(IndexMergerV9.class);
133135
HADOOP_KERBEROS_CONFIG = INJECTOR.getInstance(HadoopKerberosConfig.class);
134136
DATA_SEGMENT_PUSHER = INJECTOR.getInstance(DataSegmentPusher.class);
135137
PROPERTIES = INJECTOR.getInstance(Properties.class);
138+
139+
boolean buildV10 = Boolean.parseBoolean(PROPERTIES.getProperty(BUILD_V10_KEY, "false"));
140+
if (buildV10) {
141+
INDEX_MERGER = INJECTOR.getInstance(IndexMergerV10Factory.class).create();
142+
} else {
143+
boolean storeEmptyColumns = Boolean.parseBoolean(PROPERTIES.getProperty(STORE_EMPTY_COLUMNS_KEY, "true"));
144+
INDEX_MERGER = INJECTOR.getInstance(IndexMergerV9Factory.class).create(storeEmptyColumns);
145+
}
136146
}
137147

138148
public enum IndexJobCounters
@@ -262,6 +272,7 @@ public HadoopDruidIndexerConfig(
262272
this.allowedHadoopPrefix.add("druid.javascript");
263273
this.allowedHadoopPrefix.addAll(DATA_SEGMENT_PUSHER.getAllowedPropertyPrefixesForHadoop());
264274
this.allowedHadoopPrefix.addAll(spec.getTuningConfig().getUserAllowedHadoopPrefix());
275+
this.allowedHadoopPrefix.add("druid.indexer");
265276
}
266277

267278
@JsonProperty(value = "spec")

indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ private File persist(
602602
final ProgressIndicator progressIndicator
603603
) throws IOException
604604
{
605-
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
605+
return HadoopDruidIndexerConfig.INDEX_MERGER
606606
.persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null);
607607
}
608608

@@ -614,7 +614,7 @@ protected File mergeQueryableIndex(
614614
) throws IOException
615615
{
616616
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
617-
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
617+
return HadoopDruidIndexerConfig.INDEX_MERGER
618618
.mergeQueryableIndex(
619619
indexes,
620620
rollup,

indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ private static void writeBytes(@Nullable byte[] value, ByteArrayDataOutput out)
372372
}
373373
}
374374

375-
private static void writeString(String value, ByteArrayDataOutput out) throws IOException
375+
private static void writeString(@Nullable String value, ByteArrayDataOutput out) throws IOException
376376
{
377-
writeBytes(StringUtils.toUtf8(value), out);
377+
writeBytes(StringUtils.toUtf8Nullable(value), out);
378378
}
379379

380380
private static void writeStringArray(List<String> values, ByteArrayDataOutput out) throws IOException
@@ -389,15 +389,20 @@ private static void writeStringArray(List<String> values, ByteArrayDataOutput ou
389389
}
390390
}
391391

392+
@Nullable
392393
private static String readString(DataInput in) throws IOException
393394
{
394395
byte[] result = readBytes(in);
395-
return StringUtils.fromUtf8(result);
396+
return StringUtils.fromUtf8Nullable(result);
396397
}
397398

399+
@Nullable
398400
private static byte[] readBytes(DataInput in) throws IOException
399401
{
400402
int size = WritableUtils.readVInt(in);
403+
if (size < 0) {
404+
return null;
405+
}
401406
byte[] result = new byte[size];
402407
in.readFully(result, 0, size);
403408
return result;
@@ -449,7 +454,9 @@ public static InputRow fromBytes(
449454

450455
if (typeHelper.getType() == ValueType.STRING) {
451456
List<String> dimensionValues = (List<String>) dimValues;
452-
if (dimensionValues.size() == 1) {
457+
// Preserve single-element lists that contain null (e.g., [null]) instead of unwrapping to null,
458+
// which would then become [] when getDimension() is called. This ensures parity with native batch ingestion.
459+
if (dimensionValues.size() == 1 && dimensionValues.get(0) != null) {
453460
event.put(dimension, dimensionValues.get(0));
454461
} else {
455462
event.put(dimension, dimensionValues);

indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.apache.druid.java.util.common.DateTimes;
3939
import org.apache.druid.java.util.common.Intervals;
4040
import org.apache.druid.java.util.common.granularity.Granularities;
41+
import org.apache.druid.segment.IndexMerger;
42+
import org.apache.druid.segment.IndexMergerV10;
43+
import org.apache.druid.segment.IndexMergerV9;
4144
import org.apache.druid.segment.indexing.DataSchema;
4245
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
4346
import org.apache.druid.timeline.partition.HashPartitionFunction;
@@ -46,11 +49,13 @@
4649
import org.junit.Test;
4750

4851
import javax.annotation.Nullable;
52+
import java.lang.reflect.Field;
4953
import java.util.ArrayList;
5054
import java.util.Arrays;
5155
import java.util.Collections;
5256
import java.util.List;
5357
import java.util.Map;
58+
import java.util.Properties;
5459
import java.util.Set;
5560

5661
public class HadoopDruidIndexerConfigTest
@@ -214,6 +219,79 @@ public void testGetTargetPartitionSizeWithSingleDimensionPartitionsMaxRowsPerSeg
214219
Assert.assertEquals(maxRowsPerSegment, targetPartitionSize);
215220
}
216221

222+
/**
223+
* Tests that INDEX_MERGER is configured correctly based on the properties.
224+
* Verifies the merger type (V9 or V10) and storeEmptyColumns flag match what properties dictate.
225+
*/
226+
@Test
227+
public void testIndexMergerMatchesProperties() throws Exception
228+
{
229+
IndexMerger indexMerger = HadoopDruidIndexerConfig.INDEX_MERGER;
230+
Properties properties = HadoopDruidIndexerConfig.PROPERTIES;
231+
232+
boolean buildV10 = Boolean.parseBoolean(properties.getProperty("druid.indexer.task.buildV10", "false"));
233+
boolean expectedStoreEmptyColumns = buildV10 ||
234+
Boolean.parseBoolean(properties.getProperty("druid.indexer.task.storeEmptyColumns", "true"));
235+
236+
if (buildV10) {
237+
Assert.assertTrue(
238+
"When buildV10=true, INDEX_MERGER should be IndexMergerV10",
239+
indexMerger instanceof IndexMergerV10
240+
);
241+
} else {
242+
Assert.assertTrue(
243+
"When buildV10=false, INDEX_MERGER should be IndexMergerV9",
244+
indexMerger instanceof IndexMergerV9
245+
);
246+
// Use reflection to verify storeEmptyColumns on IndexMergerV9
247+
Field storeEmptyColumnsField = IndexMergerV9.class.getDeclaredField("storeEmptyColumns");
248+
storeEmptyColumnsField.setAccessible(true);
249+
boolean actualStoreEmptyColumns = (boolean) storeEmptyColumnsField.get(indexMerger);
250+
Assert.assertEquals(
251+
"storeEmptyColumns flag should match property value",
252+
expectedStoreEmptyColumns,
253+
actualStoreEmptyColumns
254+
);
255+
}
256+
}
257+
258+
/**
259+
* Tests that druid.indexer.task properties are passed to Hadoop jobs via getAllowedProperties().
260+
*/
261+
@Test
262+
public void testIndexerPropertiesArePassedToHadoopJobs()
263+
{
264+
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder().build();
265+
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec);
266+
267+
String storeEmptyColumnsKey = "druid.indexer.task.storeEmptyColumns";
268+
String buildV10Key = "druid.indexer.task.buildV10";
269+
String originalStoreEmpty = HadoopDruidIndexerConfig.PROPERTIES.getProperty(storeEmptyColumnsKey);
270+
String originalBuildV10 = HadoopDruidIndexerConfig.PROPERTIES.getProperty(buildV10Key);
271+
272+
try {
273+
HadoopDruidIndexerConfig.PROPERTIES.setProperty(storeEmptyColumnsKey, "true");
274+
HadoopDruidIndexerConfig.PROPERTIES.setProperty(buildV10Key, "true");
275+
276+
Map<String, String> allowedProperties = config.getAllowedProperties();
277+
Assert.assertEquals("true", allowedProperties.get(storeEmptyColumnsKey));
278+
Assert.assertEquals("true", allowedProperties.get(buildV10Key));
279+
}
280+
finally {
281+
restoreProperty(storeEmptyColumnsKey, originalStoreEmpty);
282+
restoreProperty(buildV10Key, originalBuildV10);
283+
}
284+
}
285+
286+
private void restoreProperty(String key, String originalValue)
287+
{
288+
if (originalValue != null) {
289+
HadoopDruidIndexerConfig.PROPERTIES.setProperty(key, originalValue);
290+
} else {
291+
HadoopDruidIndexerConfig.PROPERTIES.remove(key);
292+
}
293+
}
294+
217295
private static class HadoopIngestionSpecBuilder
218296
{
219297
private static final DataSchema DATA_SCHEMA =

0 commit comments

Comments
 (0)