Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cron-job-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ '11', '21' ]
jdk: [ '11', '21.0.4' ]
Dtest: [ "A*,F*,S*", "B*,D*,L*,T*", "C*,O*", "E*,N*,Q*", "G*,R*,U*", "H*,I*,J*", "K*,P*,V*,W*,X*,Y*,Z*", "M*"]
uses: ./.github/workflows/worker.yml
with:
Expand All @@ -45,7 +45,7 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ '11', '21' ]
jdk: [ '11', '21.0.4' ]
steps:
- name: Download reports for all unit test jobs
uses: actions/download-artifact@v4
Expand All @@ -72,7 +72,7 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ '11', '21' ]
jdk: [ '11', '21.0.4' ]
with:
script: .github/scripts/create-jacoco-coverage-report.sh
artifacts_to_download: "unit-test-reports-${{ matrix.jdk }}-*"
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/static-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [ '11', '17', '21' ]
# Use JDK 21.0.4 to work around https://github.com/apache/druid/issues/17429
java: [ '11', '17', '21.0.4' ]
runs-on: ubuntu-latest
steps:
- name: checkout branch
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/unit-and-integration-tests-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ '11', '17', '21' ]
# Use JDK 21.0.4 to work around https://github.com/apache/druid/issues/17429
jdk: [ '11', '17', '21.0.4' ]
runs-on: ubuntu-latest
steps:
- name: Checkout branch
Expand Down Expand Up @@ -208,12 +209,12 @@ jobs:

standard-its:
needs: [build, unit-tests-unapproved, check-approval]
if: ${{ always() && (needs.check-approval.outputs.approved == 'true' || needs.unit-tests-unapproved.result == 'success') }}
if: ${{ !cancelled() && (needs.check-approval.outputs.approved == 'true' || needs.unit-tests-unapproved.result == 'success') }}
uses: ./.github/workflows/standard-its.yml

revised-its:
needs: [build, unit-tests-unapproved, check-approval]
if: ${{ always() && (needs.check-approval.outputs.approved == 'true' || needs.unit-tests-unapproved.result == 'success') }}
if: ${{ !cancelled() && (needs.check-approval.outputs.approved == 'true' || needs.unit-tests-unapproved.result == 'success') }}
uses: ./.github/workflows/revised-its.yml
with:
BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
Expand All @@ -224,6 +225,6 @@ jobs:
actions-timeline:
needs: [build, unit-tests-approved, unit-tests-unapproved, revised-its, standard-its]
runs-on: ubuntu-latest
if: ${{ always() }}
if: ${{ !cancelled() }}
steps:
- uses: Kesin11/actions-timeline@427ee2cf860166e404d0d69b4f2b24012bb7af4f
2 changes: 1 addition & 1 deletion dev/intellij-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Intellij IDEA debugger can attach to a local or remote Java process (Druid proce
Follow these steps to debug a Druid process using the IntelliJ IDEA debugger.
1. Enable debugging in the Druid process
1. For Druid services (such as Overlord, Coordinator, etc), include the following as JVM config `-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=<PORT>` where `<PORT>` is any available port. Note that different port values should be chosen for each Druid service.
2. For the peons (workers on Middlemanager), include the following `agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0` in runtime properties `druid.indexer.runner.javaOpts` of the Middle Manager. Note that `address=0` will tell the debugger to assign ephemeral port.
2. For the peons (workers on Middlemanager), add the string `"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0"` to the runtime property `druid.indexer.runner.javaOptsArray` of the Middle Manager. Note that `address=0` will tell the debugger to assign an ephemeral port.
2. Find the port assigned to the Druid process.
1. For Druid services, the port value is what you chose in the JVM argument of step 1i above. The port value can also be found in the first line of each Druid service log.
2. For the peons (workers on Middlemanager), you can find the assigned ephemeral port by checking the first line of the task log.
Expand Down
111 changes: 108 additions & 3 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,102 @@ Host: http://ROUTER_IP:ROUTER_PORT

A successful request returns an HTTP `200 OK` message code and an empty response body.

### Update cluster-level compaction config

Updates cluster-level configuration for compaction tasks which applies to all datasources, unless explicitly overridden in the datasource compaction config.
This includes the following fields:

|Config|Description|Default value|
|------|-----------|-------------|
|`compactionTaskSlotRatio`|Ratio of number of slots taken up by compaction tasks to the number of total task slots across all workers.|0.1|
|`maxCompactionTaskSlots`|Maximum number of task slots that can be taken up by compaction tasks and sub-tasks. Minimum number of task slots available for compaction is 1. When using MSQ engine or Native engine with range partitioning, a single compaction job occupies more than one task slot. In this case, the minimum is 2 so that at least one compaction job can always run in the cluster.|2147483647 (i.e. total task slots)|
|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the only supported policy is [Newest segment first](#compaction-policy-newestsegmentfirst).|Newest segment first|
|`useSupervisors`|Whether compaction should be run on Overlord using supervisors instead of Coordinator duties.|false|
|`engine`|Engine used for running compaction tasks, unless overridden in the datasource-level compaction config. Possible values are `native` and `msq`. `msq` engine can be used for compaction only if `useSupervisors` is `true`.|`native`|

#### Compaction policy `newestSegmentFirst`

|Field|Description|Default value|
|-----|-----------|-------------|
|`type`|This must always be `newestSegmentFirst`||
|`priorityDatasource`|Datasource to prioritize for compaction. The intervals of this datasource are chosen for compaction before the intervals of any other datasource. Within this datasource, the intervals are prioritized based on the chosen compaction policy.|None|


#### URL

`POST` `/druid/coordinator/v1/config/compaction/cluster`

#### Responses

<Tabs>

<TabItem value="8" label="200 SUCCESS">


*Successfully updated compaction configuration*

</TabItem>
<TabItem value="9" label="404 NOT FOUND">


*Invalid `max` value*

</TabItem>
</Tabs>

---

#### Sample request

<Tabs>

<TabItem value="10" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config/compaction/cluster" \
--header 'Content-Type: application/json' \
--data '{
"compactionTaskSlotRatio": 0.5,
"maxCompactionTaskSlots": 1500,
"compactionPolicy": {
"type": "newestSegmentFirst",
"priorityDatasource": "wikipedia"
},
"useSupervisors": true,
"engine": "msq"
}'

```

</TabItem>
<TabItem value="11" label="HTTP">


```HTTP
POST /druid/coordinator/v1/config/compaction/cluster HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json

{
"compactionTaskSlotRatio": 0.5,
"maxCompactionTaskSlots": 1500,
"compactionPolicy": {
"type": "newestSegmentFirst",
"priorityDatasource": "wikipedia"
},
"useSupervisors": true,
"engine": "msq"
}
```

</TabItem>
</Tabs>

#### Sample response

A successful request returns an HTTP `200 OK` message code and an empty response body.

## View automatic compaction configuration

### Get all automatic compaction configurations
Expand Down Expand Up @@ -406,7 +502,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
],
"compactionTaskSlotRatio": 0.1,
"maxCompactionTaskSlots": 2147483647,
"useAutoScaleSlots": false

}
```
</details>
Expand Down Expand Up @@ -593,7 +689,12 @@ Host: http://ROUTER_IP:ROUTER_PORT
"globalConfig": {
"compactionTaskSlotRatio": 0.1,
"maxCompactionTaskSlots": 2147483647,
"useAutoScaleSlots": false
"compactionPolicy": {
"type": "newestSegmentFirst",
"priorityDatasource": "wikipedia"
},
"useSupervisors": true,
"engine": "native"
},
"compactionConfig": {
"dataSource": "wikipedia_hour",
Expand Down Expand Up @@ -624,7 +725,11 @@ Host: http://ROUTER_IP:ROUTER_PORT
"globalConfig": {
"compactionTaskSlotRatio": 0.1,
"maxCompactionTaskSlots": 2147483647,
"useAutoScaleSlots": false
"compactionPolicy": {
"type": "newestSegmentFirst"
},
"useSupervisors": false,
"engine": "native"
},
"compactionConfig": {
"dataSource": "wikipedia_hour",
Expand Down
9 changes: 5 additions & 4 deletions docs/data-management/automatic-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,12 @@ You can run automatic compaction using compaction supervisors on the Overlord ra
* Tracked compaction task status to avoid re-compacting an interval repeatedly


To use compaction supervisors, set the following properties in your Overlord runtime properties:
* `druid.supervisor.compaction.enabled` to `true` so that compaction tasks can be run as supervisor tasks
* `druid.supervisor.compaction.engine` to `msq` to specify the MSQ task engine as the compaction engine or to `native` to use the native engine. This is the default engine if the `engine` field is omitted from your compaction config
To use compaction supervisors, update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set:

Compaction supervisors use the same syntax as auto-compaction using Coordinator duties with one key difference: you submit the auto-compaction as a a supervisor spec. In the spec, set the `type` to `autocompact` and include the auto-compaction config in the `spec`.
* `useSupervisors` to `true` so that compaction tasks can be run as supervisor tasks
* `engine` to `msq` to use the MSQ task engine as the compaction engine or to `native` (default value) to use the native engine.

Compaction supervisors use the same syntax as auto-compaction using Coordinator duties with one key difference: you submit the auto-compaction as a supervisor spec. In the spec, set the `type` to `autocompact` and include the auto-compaction config in the `spec`.

To submit an automatic compaction task, you can submit a supervisor spec through the [web console](#manage-compaction-supervisors-with-the-web-console) or the [supervisor API](#manage-compaction-supervisors-with-supervisor-apis).

Expand Down
2 changes: 1 addition & 1 deletion docs/development/extensions-core/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The AWS SDK requires that a target region be specified. You can set these by us
For example, to set the region to 'us-east-1' through system properties:

* Add `-Daws.region=us-east-1` to the `jvm.config` file for all Druid services.
* Add `-Daws.region=us-east-1` to `druid.indexer.runner.javaOpts` in [Middle Manager configuration](../../configuration/index.md#middle-manager-configuration) so that the property will be passed to Peon (worker) processes.
* Add `"-Daws.region=us-east-1"` to `druid.indexer.runner.javaOptsArray` in [Middle Manager configuration](../../configuration/index.md#middle-manager-configuration) so that the property will be passed to Peon (worker) processes.

### Connecting to S3 configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.querykit;

import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
Expand Down Expand Up @@ -69,11 +70,21 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti
}

/**
* Factory that produces globally sorted partitions of a target size.
* Factory that produces globally sorted partitions of a target size, using the {@link ClusterBy} to partition
* rows across partitions.
*
* Produces {@link MixShuffleSpec}, ignoring the target size, if the provided {@link ClusterBy} is empty.
*/
public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
{
return (clusterBy, aggregate) ->
new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
return (clusterBy, aggregate) -> {
if (clusterBy.isEmpty()) {
// Cannot partition or sort meaningfully because there are no cluster-by keys. Generate a MixShuffleSpec
// so everything goes into a single partition.
return MixShuffleSpec.instance();
} else {
return new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,38 @@
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnFoo1NoDimensionsWithLimit(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter Note test

The parameter 'contextName' is never used.

Copilot Autofix

AI about 1 year ago

To fix the problem, we should remove the unused parameter contextName from the method testInsertOnFoo1NoDimensionsWithLimit. This will simplify the method's interface and eliminate the unnecessary parameter, making the code cleaner and easier to maintain.

Suggested changeset 1
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -1610,3 +1610,3 @@
   @ParameterizedTest(name = "{index}:with context {0}")
-  public void testInsertOnFoo1NoDimensionsWithLimit(String contextName, Map<String, Object> context)
+  public void testInsertOnFoo1NoDimensionsWithLimit(Map<String, Object> context)
   {
EOF
@@ -1610,3 +1610,3 @@
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnFoo1NoDimensionsWithLimit(String contextName, Map<String, Object> context)
public void testInsertOnFoo1NoDimensionsWithLimit(Map<String, Object> context)
{
Copilot is powered by AI and may make mistakes. Always verify output.
{
Map<String, Object> queryContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2)
.build();

List<Object[]> expectedRows = ImmutableList.of(new Object[]{DateTimes.utc(0L).getMillis(), 5L});

RowSignature rowSignature = RowSignature.builder()
.addTimeColumn()
.add("cnt", ColumnType.LONG)
.build();

testIngestQuery()
.setSql("insert into foo1 select count(*) cnt from foo where dim1 != '' limit 4 partitioned by all")
.setExpectedDataSource("foo1")
.setQueryContext(queryContext)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedRows)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec to generate segments since the query is inserting rows."
)
)
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnRestricted(String contextName, Map<String, Object> context)
Expand Down
Loading
Loading