Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/api-reference/sql-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,9 @@ Getting the query results for an ingestion query returns an empty response.
* `resultFormat` (optional)
* Type: String
* Defines the format in which the results are presented. The following options are supported `arrayLines`,`objectLines`,`array`,`object`, and `csv`. The default is `object`.
* `filename` (optional)
* Type: String
* If set, attaches a `Content-Disposition` header to the response with the value of `attachment; filename={filename}`. The filename must not be longer than 255 characters and must not contain the characters `/`, `\`, `:`, `*`, `?`, `"`, `<`, `>`, `|`, `\0`, `\n`, or `\r`.

#### Responses

Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1175,8 +1175,8 @@ The following properties pertain to segment metadata caching on the Overlord tha

|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.segments.useCache`|If `true`, segment metadata is cached on the Overlord to speed up metadata operations.|false|
|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is `true`.|`PT1M` (1 minute)|
|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`|
|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `always` or `ifSynced`.|`PT1M` (1 minute)|

#### Overlord dynamic configuration

Expand Down
21 changes: 14 additions & 7 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher number isn't representative of a problem.|
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Historical

Expand All @@ -102,6 +109,13 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher number isn't representative of a problem.|
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Real-time

Expand All @@ -117,13 +131,6 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||

### GroupBy query metrics

These metrics are reported from broker, historical and real-time nodes

|Metric|Description|Normal value|
|------|-----------|------------|
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher number isn't representative of a problem.|
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
Expand Down
2 changes: 1 addition & 1 deletion docs/querying/multi-value-dimensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ results in:
```sql
SELECT label, tags
FROM "mvd_example_rollup"
WHERE tags = 't3'
WHERE label in ('row1','row2')
GROUP BY 1,2
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -113,7 +114,8 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public void setUp() throws Exception
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public void testRunWithMinimumMessageTime() throws Exception

final ListenableFuture<TaskStatus> future = runTask(task);

waitUntil(task, this::isTaskReading);
waitUntil(task, this::isTaskPublishing);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Expand Down Expand Up @@ -856,7 +856,7 @@ public void testRunWithMaximumMessageTime() throws Exception

final ListenableFuture<TaskStatus> future = runTask(task);

waitUntil(task, this::isTaskReading);
waitUntil(task, this::isTaskPublishing);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Expand Down Expand Up @@ -915,7 +915,7 @@ public void testRunWithTransformSpec() throws Exception
);

final ListenableFuture<TaskStatus> future = runTask(task);
waitUntil(task, this::isTaskReading);
waitUntil(task, this::isTaskPublishing);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Expand Down Expand Up @@ -2461,6 +2461,16 @@ private boolean isTaskReading(KinesisIndexTask task)
return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING;
}

/**
* Return true if specified task is in PUBLISHING state
* @param task {@link KinesisIndexTask} having its state checked
* @return true if task is in PUBLISHING state, otherwise false
*/
private boolean isTaskPublishing(KinesisIndexTask task)
{
return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.PUBLISHING;
}

private static KinesisRecordEntity kjb(
String timestamp,
String dim1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand All @@ -114,6 +115,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


Expand All @@ -122,6 +124,8 @@ public class SqlStatementResource
{

public static final String RESULT_FORMAT = "__resultFormat";
public static final String CONTENT_DISPOSITION_RESPONSE_HEADER = "Content-Disposition";
private static final Pattern FILENAME_PATTERN = Pattern.compile("^[^/:*?><\\\\\"|\0\n\r]*$");
private static final Logger log = new Logger(SqlStatementResource.class);
private final SqlStatementFactory msqSqlStatementFactory;
private final ObjectMapper jsonMapper;
Expand Down Expand Up @@ -277,6 +281,7 @@ public Response doGetResults(
@PathParam("id") final String queryId,
@QueryParam("page") Long page,
@QueryParam("resultFormat") String resultFormat,
@QueryParam("filename") String filename,
@Context final HttpServletRequest req
)
{
Expand Down Expand Up @@ -309,10 +314,12 @@ public Response doGetResults(
);
throwIfQueryIsNotSuccessful(queryId, statusPlus);

final String contentDispositionHeaderValue = filename != null ? StringUtils.format("attachment; filename=\"%s\"", validateFilename(filename)) : null;

Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
if (!signature.isPresent() || MSQControllerTask.isIngestion(msqControllerTask.getQuerySpec())) {
// Since it's not a select query, nothing to return.
return Response.ok().build();
return addContentDisposition(Response.ok(), contentDispositionHeaderValue).build();
}

// returning results
Expand All @@ -321,18 +328,20 @@ public Response doGetResults(
results = getResultYielder(queryId, page, msqControllerTask, closer);
if (!results.isPresent()) {
// no results, return empty
return Response.ok().build();
return addContentDisposition(Response.ok(), contentDispositionHeaderValue).build();
}

ResultFormat preferredFormat = getPreferredResultFormat(resultFormat, msqControllerTask.getQuerySpec());
return Response.ok((StreamingOutput) outputStream -> resultPusher(
final Response.ResponseBuilder responseBuilder = Response.ok((StreamingOutput) outputStream -> resultPusher(
queryId,
signature,
closer,
results,
new CountingOutputStream(outputStream),
preferredFormat
)).build();
));

return addContentDisposition(responseBuilder, contentDispositionHeaderValue).build();
}


Expand Down Expand Up @@ -976,6 +985,42 @@ private void checkForDurableStorageConnectorImpl()
}
}

private static Response.ResponseBuilder addContentDisposition(
Response.ResponseBuilder responseBuilder,
String contentDisposition
)
{
if (contentDisposition != null) {
responseBuilder.header(CONTENT_DISPOSITION_RESPONSE_HEADER, contentDisposition);
}
return responseBuilder;
}

/**
* Validates that a filename is valid. Filenames are considered to be valid if it is:
* <ul>
* <li>Not empty.</li>
* <li>Not longer than 255 characters.</li>
* <li>Does not contain the characters `/`, `\`, `:`, `*`, `?`, `"`, `<`, `>`, `|`, `\0`, `\n`, or `\r`.</li>
* </ul>
*/
@VisibleForTesting
static String validateFilename(@NotNull String filename)
{
if (filename.isEmpty()) {
throw InvalidInput.exception("Filename cannot be empty.");
}

if (filename.length() > 255) {
throw InvalidInput.exception("Filename cannot be longer than 255 characters.");
}

if (!FILENAME_PATTERN.matcher(filename).matches()) {
throw InvalidInput.exception("Filename contains invalid characters. (/, \\, :, *, ?, \", <, >, |, \0, \n, or \r)");
}
return filename;
}

private <T> T contactOverlord(final ListenableFuture<T> future, String queryId)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

package org.apache.druid.msq.quidem;

import org.apache.druid.quidem.DruidQuidemCommandHandler;
import org.apache.druid.quidem.DruidQuidemTestBase;
import org.apache.druid.quidem.ProjectPathUtils;

import java.io.File;

public class MSQQuidemTest extends DruidQuidemTestBase
{
public MSQQuidemTest()
{
super();
super(new DruidQuidemCommandHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ public void testWithDurableStorage() throws IOException
sqlStatementResult.getQueryId(),
null,
ResultFormat.OBJECTLINES.name(),
null,
SqlStatementResourceTest.makeOkRequest()
),
objectMapper
Expand All @@ -406,6 +407,7 @@ public void testWithDurableStorage() throws IOException
sqlStatementResult.getQueryId(),
0L,
ResultFormat.OBJECTLINES.name(),
null,
SqlStatementResourceTest.makeOkRequest()
),
objectMapper
Expand All @@ -419,6 +421,7 @@ public void testWithDurableStorage() throws IOException
sqlStatementResult.getQueryId(),
2L,
ResultFormat.OBJECTLINES.name(),
null,
SqlStatementResourceTest.makeOkRequest()
),
objectMapper
Expand Down Expand Up @@ -485,41 +488,47 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
sqlStatementResult.getQueryId(),
null,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(0, 2), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(2, 4), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
1L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(4, 6), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
2L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(6, 8), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
3L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows.subList(8, 10), SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
4L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));
}
Expand Down Expand Up @@ -565,6 +574,7 @@ public void testResultFormat() throws Exception
sqlStatementResult.getQueryId(),
null,
resultFormat.name(),
null,
SqlStatementResourceTest.makeOkRequest()
), objectMapper
)
Expand All @@ -577,6 +587,7 @@ public void testResultFormat() throws Exception
sqlStatementResult.getQueryId(),
0L,
resultFormat.name(),
null,
SqlStatementResourceTest.makeOkRequest()
), objectMapper
)
Expand Down Expand Up @@ -616,13 +627,15 @@ public void testResultFormatWithParamInSelect() throws IOException
sqlStatementResult.getQueryId(),
null,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));

Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
ResultFormat.ARRAY.name(),
null,
SqlStatementResourceTest.makeOkRequest()
)));
}
Expand Down Expand Up @@ -695,6 +708,7 @@ public void testInsert()
actual.getQueryId(),
0L,
null,
null,
SqlStatementResourceTest.makeOkRequest()
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus());
Expand Down Expand Up @@ -738,6 +752,7 @@ public void testReplaceAll()
actual.getQueryId(),
0L,
null,
null,
SqlStatementResourceTest.makeOkRequest()
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus());
Expand Down
Loading
Loading