Skip to content

Commit 4dcf459

Browse files
committed
feat: add realtimeSegmentsMode query context param
1 parent 1d87a74 commit 4dcf459

6 files changed

Lines changed: 225 additions & 13 deletions

File tree

docs/querying/query-context-reference.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ Unless otherwise noted, the following parameters apply to all query types, and t
7171
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
7272
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
7373
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|
74-
|`realtimeSegmentsOnly` |`false`| When set to true, only query realtime segments. Historical segments are excluded. |
74+
|`realtimeSegmentsMode` |`include`| Controls which segments are queried, classified by whether a historical replica exists. `include` queries all segments. `exclusive` queries only segments served solely by realtime servers; any segment with at least one historical replica (including segments mid-handoff) is excluded. `exclude` is the inverse: segments served solely by realtime servers are skipped, but segments mid-handoff that have both a realtime and a historical replica are still queried. |
75+
|`realtimeSegmentsOnly` |`false`| **Deprecated.** Use `realtimeSegmentsMode=exclusive` instead. When set to `true`, equivalent to `realtimeSegmentsMode=exclusive`. |
7576

7677
## Parameters by query type
7778

processing/src/main/java/org/apache/druid/query/QueryContext.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.druid.java.util.common.HumanReadableBytes;
2828
import org.apache.druid.java.util.common.StringUtils;
2929
import org.apache.druid.java.util.common.granularity.Granularity;
30+
import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
3031
import org.apache.druid.query.QueryContexts.Vectorize;
3132
import org.apache.druid.query.filter.InDimFilter;
3233
import org.apache.druid.query.filter.TypedInFilter;
@@ -781,8 +782,34 @@ public boolean isPrePlanned()
781782
return getBoolean(QueryContexts.CTX_PREPLANNED, QueryContexts.DEFAULT_PREPLANNED);
782783
}
783784

785+
/**
786+
* Returns the realtime segments mode for this query. If {@code realtimeSegmentsMode} is absent
787+
* or null, falls back to the deprecated {@code realtimeSegmentsOnly} boolean: {@code true} maps
788+
* to {@link RealtimeSegmentsMode#EXCLUSIVE}; otherwise returns {@link RealtimeSegmentsMode#INCLUDE}.
789+
*/
790+
public RealtimeSegmentsMode getRealtimeSegmentsMode()
791+
{
792+
RealtimeSegmentsMode mode = getEnum(
793+
QueryContexts.REALTIME_SEGMENTS_MODE,
794+
RealtimeSegmentsMode.class,
795+
null
796+
);
797+
if (mode != null) {
798+
return mode;
799+
}
800+
// Backward-compat: honour the deprecated realtimeSegmentsOnly flag.
801+
if (getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY)) {
802+
return RealtimeSegmentsMode.EXCLUSIVE;
803+
}
804+
return QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
805+
}
806+
807+
/**
808+
* @deprecated Use {@link #getRealtimeSegmentsMode()} instead.
809+
*/
810+
@Deprecated
784811
public boolean isRealtimeSegmentsOnly()
785812
{
786-
return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY);
813+
return getRealtimeSegmentsMode() == RealtimeSegmentsMode.EXCLUSIVE;
787814
}
788815
}

processing/src/main/java/org/apache/druid/query/QueryContexts.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,20 @@ public class QueryContexts
146146
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
147147
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";
148148

149+
/**
150+
* @deprecated Use {@link #REALTIME_SEGMENTS_MODE} instead.
151+
*/
152+
@Deprecated
149153
public static final String REALTIME_SEGMENTS_ONLY = "realtimeSegmentsOnly";
154+
/**
155+
* @deprecated Use {@link #DEFAULT_REALTIME_SEGMENTS_MODE} instead.
156+
*/
157+
@Deprecated
150158
public static final boolean DEFAULT_REALTIME_SEGMENTS_ONLY = false;
151159

160+
public static final String REALTIME_SEGMENTS_MODE = "realtimeSegmentsMode";
161+
public static final RealtimeSegmentsMode DEFAULT_REALTIME_SEGMENTS_MODE = RealtimeSegmentsMode.INCLUDE;
162+
152163
public static final String CTX_PREPLANNED = "prePlanned";
153164
public static final boolean DEFAULT_PREPLANNED = true;
154165

@@ -233,6 +244,39 @@ public String toString()
233244
}
234245
}
235246

247+
/**
248+
* Classifies segments by whether a historical replica exists
249+
* (see {@link org.apache.druid.client.selector.ServerSelector#isRealtimeSegment()}: a segment is
250+
* "realtime" only when it has realtime servers and zero historical servers).
251+
*/
252+
public enum RealtimeSegmentsMode
253+
{
254+
/** Include all segments (default). */
255+
INCLUDE,
256+
/** Include only segments served solely by realtime servers; any segment with a historical replica
257+
* (including segments mid-handoff) is excluded. */
258+
EXCLUSIVE,
259+
/** Exclude segments served solely by realtime servers; segments mid-handoff with both realtime
260+
* and historical replicas are still included. */
261+
EXCLUDE;
262+
263+
@JsonCreator
264+
public static RealtimeSegmentsMode fromString(String str)
265+
{
266+
if (str == null) {
267+
return null;
268+
}
269+
return RealtimeSegmentsMode.valueOf(StringUtils.toUpperCase(str));
270+
}
271+
272+
@Override
273+
@JsonValue
274+
public String toString()
275+
{
276+
return StringUtils.toLowerCase(name());
277+
}
278+
}
279+
236280
private QueryContexts()
237281
{
238282
}

processing/src/test/java/org/apache/druid/query/QueryContextTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ public void testNonLegacyIsNotLegacyContext()
430430
}
431431

432432
@Test
433+
@SuppressWarnings("deprecation")
433434
public void testIsRealtimeSegmentsOnly()
434435
{
435436
assertFalse(QueryContext.empty().isRealtimeSegmentsOnly());
@@ -440,6 +441,66 @@ public void testIsRealtimeSegmentsOnly()
440441
);
441442
}
442443

444+
@Test
445+
public void testGetRealtimeSegmentsMode()
446+
{
447+
assertEquals(
448+
QueryContexts.RealtimeSegmentsMode.INCLUDE,
449+
QueryContext.empty().getRealtimeSegmentsMode()
450+
);
451+
assertEquals(
452+
QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
453+
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclusive"))
454+
.getRealtimeSegmentsMode()
455+
);
456+
assertEquals(
457+
QueryContexts.RealtimeSegmentsMode.EXCLUDE,
458+
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"))
459+
.getRealtimeSegmentsMode()
460+
);
461+
assertEquals(
462+
QueryContexts.RealtimeSegmentsMode.INCLUDE,
463+
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
464+
.getRealtimeSegmentsMode()
465+
);
466+
}
467+
468+
@Test
469+
public void testGetRealtimeSegmentsModeBackwardCompat()
470+
{
471+
// realtimeSegmentsOnly=true maps to EXCLUSIVE
472+
assertEquals(
473+
QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
474+
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, true))
475+
.getRealtimeSegmentsMode()
476+
);
477+
// realtimeSegmentsOnly=false maps to INCLUDE (default)
478+
assertEquals(
479+
QueryContexts.RealtimeSegmentsMode.INCLUDE,
480+
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, false))
481+
.getRealtimeSegmentsMode()
482+
);
483+
// realtimeSegmentsMode takes precedence over realtimeSegmentsOnly
484+
assertEquals(
485+
QueryContexts.RealtimeSegmentsMode.EXCLUDE,
486+
QueryContext.of(ImmutableMap.of(
487+
QueryContexts.REALTIME_SEGMENTS_ONLY, true,
488+
QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"
489+
)).getRealtimeSegmentsMode()
490+
);
491+
}
492+
493+
@Test
494+
public void testGetRealtimeSegmentsModeInvalidValue()
495+
{
496+
BadQueryContextException e = assertThrows(
497+
BadQueryContextException.class,
498+
() -> QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "badvalue"))
499+
.getRealtimeSegmentsMode()
500+
);
501+
assertTrue(e.getMessage().contains("realtimeSegmentsMode"));
502+
}
503+
443504
@Test
444505
public void testSerialization() throws Exception
445506
{

server/src/main/java/org/apache/druid/client/CachingClusteredClient.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.druid.query.Query;
6262
import org.apache.druid.query.QueryContext;
6363
import org.apache.druid.query.QueryContexts;
64+
import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
6465
import org.apache.druid.query.QueryMetrics;
6566
import org.apache.druid.query.QueryPlus;
6667
import org.apache.druid.query.QueryRunner;
@@ -444,7 +445,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
444445
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
445446
final SegmentPruner segmentPruner = ev.getSegmentPruner();
446447

447-
boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
448+
RealtimeSegmentsMode realtimeSegmentsMode = query.context().getRealtimeSegmentsMode();
448449
// Filter unneeded chunks based on partition dimension
449450
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
450451
final Collection<PartitionChunk<ServerSelector>> filteredChunks;
@@ -458,8 +459,19 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
458459
}
459460
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
460461
ServerSelector server = chunk.getObject();
461-
if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
462-
continue; // Skip historical segments when only realtime segments are requested
462+
switch (realtimeSegmentsMode) {
463+
case EXCLUSIVE:
464+
if (!server.isRealtimeSegment()) {
465+
continue;
466+
}
467+
break;
468+
case EXCLUDE:
469+
if (server.isRealtimeSegment()) {
470+
continue;
471+
}
472+
break;
473+
case INCLUDE:
474+
break;
463475
}
464476
final SegmentDescriptor segment = new SegmentDescriptor(
465477
holder.getInterval(),

server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3132,27 +3132,94 @@ public void testRealtimeSegmentsQueryContext()
31323132
selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment);
31333133
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));
31343134

3135-
final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
3135+
// include (default): historical segment is included
3136+
final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
31363137
.dataSource(DATA_SOURCE)
31373138
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
3138-
.context(ImmutableMap.of("realtimeSegmentsOnly", false))
3139+
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
31393140
.randomQueryId()
31403141
.build();
31413142

3142-
final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
3143+
// exclusive: only realtime segments — historical segment is excluded
3144+
final TimeBoundaryQuery queryExclusive = Druids.newTimeBoundaryQueryBuilder()
3145+
.dataSource(DATA_SOURCE)
3146+
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
3147+
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclusive"))
3148+
.randomQueryId()
3149+
.build();
3150+
3151+
// backward compat: realtimeSegmentsOnly=true maps to EXCLUSIVE
3152+
final TimeBoundaryQuery queryLegacyTrue = Druids.newTimeBoundaryQueryBuilder()
31433153
.dataSource(DATA_SOURCE)
31443154
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
3145-
.context(ImmutableMap.of("realtimeSegmentsOnly", true))
3155+
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, true))
31463156
.randomQueryId()
31473157
.build();
31483158

31493159
final ResponseContext responseContext = initializeResponseContext();
31503160

3151-
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
3152-
getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
3161+
getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
3162+
getDefaultQueryRunner().run(QueryPlus.wrap(queryExclusive), responseContext);
3163+
getDefaultQueryRunner().run(QueryPlus.wrap(queryLegacyTrue), responseContext);
3164+
3165+
final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
3166+
Assert.assertEquals(1, remainingResponseMap.get(queryInclude.getId()).intValue());
3167+
Assert.assertEquals(0, remainingResponseMap.get(queryExclusive.getId()).intValue());
3168+
Assert.assertEquals(0, remainingResponseMap.get(queryLegacyTrue.getId()).intValue());
3169+
}
3170+
3171+
@Test
3172+
public void testRealtimeSegmentsModeExclude()
3173+
{
3174+
final Interval interval = Intervals.of("2016-01-01/2016-01-02");
3175+
final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00");
3176+
final DataSegment dataSegment = new DataSegment(
3177+
"dataSource",
3178+
interval,
3179+
"ver",
3180+
ImmutableMap.of("type", "hdfs", "path", "/tmp"),
3181+
ImmutableList.of("product"),
3182+
ImmutableList.of("visited_sum"),
3183+
NoneShardSpec.instance(),
3184+
9,
3185+
12334
3186+
);
3187+
3188+
// selector backed only by a realtime server — isRealtimeSegment() == true
3189+
final DruidServer realtimeServer = new DruidServer(
3190+
"rt1", "rt1", null, 10, null, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0
3191+
);
3192+
final ServerSelector realtimeSelector = new ServerSelector(
3193+
dataSegment,
3194+
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
3195+
HistoricalFilter.IDENTITY_FILTER
3196+
);
3197+
realtimeSelector.addServerAndUpdateSegment(new QueryableDruidServer(realtimeServer, null), dataSegment);
3198+
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(realtimeSelector));
3199+
3200+
// exclude: realtime-only segment is skipped
3201+
final TimeBoundaryQuery queryExclude = Druids.newTimeBoundaryQueryBuilder()
3202+
.dataSource(DATA_SOURCE)
3203+
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
3204+
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"))
3205+
.randomQueryId()
3206+
.build();
3207+
3208+
// include: realtime-only segment is included
3209+
final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
3210+
.dataSource(DATA_SOURCE)
3211+
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
3212+
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
3213+
.randomQueryId()
3214+
.build();
3215+
3216+
final ResponseContext responseContext = initializeResponseContext();
3217+
getDefaultQueryRunner().run(QueryPlus.wrap(queryExclude), responseContext);
3218+
getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
3219+
31533220
final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
3154-
Assert.assertEquals(1, remainingResponseMap.get(query.getId()).intValue());
3155-
Assert.assertEquals(0, remainingResponseMap.get(query2.getId()).intValue());
3221+
Assert.assertEquals(0, remainingResponseMap.get(queryExclude.getId()).intValue());
3222+
Assert.assertEquals(1, remainingResponseMap.get(queryInclude.getId()).intValue());
31563223
}
31573224

31583225
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)