Skip to content

Commit 91614fa

Browse files
risingwave-cishanickyCopilot
authored
refactor: improve stream parallelism resolution and error handling (#23921) (#23926)
Signed-off-by: Peng Chen <[email protected]> Co-authored-by: Shanicky Chen <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent a62466d commit 91614fa

File tree

3 files changed

+206
-32
lines changed

3 files changed

+206
-32
lines changed

src/meta/src/rpc/ddl_controller.rs

Lines changed: 122 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1589,8 +1589,24 @@ impl DdlController {
15891589
cluster_info: &StreamingClusterInfo,
15901590
resource_group: String,
15911591
) -> MetaResult<NonZeroUsize> {
1592-
let available = cluster_info.parallelism(&resource_group);
1593-
let Some(available) = NonZeroUsize::new(available) else {
1592+
let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1593+
DdlController::resolve_stream_parallelism_inner(
1594+
specified,
1595+
max,
1596+
available,
1597+
&self.env.opts.default_parallelism,
1598+
&resource_group,
1599+
)
1600+
}
1601+
1602+
fn resolve_stream_parallelism_inner(
1603+
specified: Option<NonZeroUsize>,
1604+
max: NonZeroUsize,
1605+
available: Option<NonZeroUsize>,
1606+
default_parallelism: &DefaultParallelism,
1607+
resource_group: &str,
1608+
) -> MetaResult<NonZeroUsize> {
1609+
let Some(available) = available else {
15941610
bail_unavailable!(
15951611
"no available slots to schedule in resource group \"{}\", \
15961612
have you allocated any compute nodes within this resource group?",
@@ -1607,42 +1623,40 @@ impl DdlController {
16071623
);
16081624
}
16091625
if specified > available {
1610-
bail_unavailable!(
1611-
"insufficient parallelism to schedule in resource group \"{}\", \
1612-
required: {}, available: {}",
1626+
tracing::warn!(
16131627
resource_group,
1614-
specified,
1615-
available,
1628+
specified_parallelism = specified.get(),
1629+
available_parallelism = available.get(),
1630+
"specified parallelism exceeds available slots, scheduling with specified value",
16161631
);
16171632
}
1618-
Ok(specified)
1619-
} else {
1620-
// Use configured parallelism if no default parallelism is specified.
1621-
let default_parallelism = match self.env.opts.default_parallelism {
1622-
DefaultParallelism::Full => available,
1623-
DefaultParallelism::Default(num) => {
1624-
if num > available {
1625-
bail_unavailable!(
1626-
"insufficient parallelism to schedule in resource group \"{}\", \
1627-
required: {}, available: {}",
1628-
resource_group,
1629-
num,
1630-
available,
1631-
);
1632-
}
1633-
num
1634-
}
1635-
};
1633+
return Ok(specified);
1634+
}
16361635

1637-
if default_parallelism > max {
1638-
tracing::warn!(
1639-
max_parallelism = max.get(),
1640-
resource_group,
1641-
"too many parallelism available, use max parallelism instead",
1642-
);
1636+
// Use default parallelism when no specific parallelism is provided by the user.
1637+
let default_parallelism = match default_parallelism {
1638+
DefaultParallelism::Full => available,
1639+
DefaultParallelism::Default(num) => {
1640+
if *num > available {
1641+
tracing::warn!(
1642+
resource_group,
1643+
configured_parallelism = num.get(),
1644+
available_parallelism = available.get(),
1645+
"default parallelism exceeds available slots, scheduling with configured value",
1646+
);
1647+
}
1648+
*num
16431649
}
1644-
Ok(default_parallelism.min(max))
1650+
};
1651+
1652+
if default_parallelism > max {
1653+
tracing::warn!(
1654+
max_parallelism = max.get(),
1655+
resource_group,
1656+
"default parallelism exceeds max parallelism, capping to max",
1657+
);
16451658
}
1659+
Ok(default_parallelism.min(max))
16461660
}
16471661

16481662
/// Builds the actor graph:
@@ -2356,3 +2370,79 @@ pub fn refill_upstream_sink_union_in_table(
23562370
}
23572371
});
23582372
}
2373+
2374+
#[cfg(test)]
2375+
mod tests {
2376+
use std::num::NonZeroUsize;
2377+
2378+
use super::*;
2379+
2380+
#[test]
2381+
fn test_specified_parallelism_exceeds_available() {
2382+
let result = DdlController::resolve_stream_parallelism_inner(
2383+
Some(NonZeroUsize::new(100).unwrap()),
2384+
NonZeroUsize::new(256).unwrap(),
2385+
Some(NonZeroUsize::new(4).unwrap()),
2386+
&DefaultParallelism::Full,
2387+
"default",
2388+
)
2389+
.unwrap();
2390+
assert_eq!(result.get(), 100);
2391+
}
2392+
2393+
#[test]
2394+
fn test_allows_default_parallelism_over_available() {
2395+
let result = DdlController::resolve_stream_parallelism_inner(
2396+
None,
2397+
NonZeroUsize::new(256).unwrap(),
2398+
Some(NonZeroUsize::new(4).unwrap()),
2399+
&DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2400+
"default",
2401+
)
2402+
.unwrap();
2403+
assert_eq!(result.get(), 50);
2404+
}
2405+
2406+
#[test]
2407+
fn test_full_parallelism_capped_by_max() {
2408+
let result = DdlController::resolve_stream_parallelism_inner(
2409+
None,
2410+
NonZeroUsize::new(6).unwrap(),
2411+
Some(NonZeroUsize::new(10).unwrap()),
2412+
&DefaultParallelism::Full,
2413+
"default",
2414+
)
2415+
.unwrap();
2416+
assert_eq!(result.get(), 6);
2417+
}
2418+
2419+
#[test]
2420+
fn test_no_available_slots_returns_error() {
2421+
let result = DdlController::resolve_stream_parallelism_inner(
2422+
None,
2423+
NonZeroUsize::new(4).unwrap(),
2424+
None,
2425+
&DefaultParallelism::Full,
2426+
"default",
2427+
);
2428+
assert!(matches!(
2429+
result,
2430+
Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2431+
));
2432+
}
2433+
2434+
#[test]
2435+
fn test_specified_over_max_returns_error() {
2436+
let result = DdlController::resolve_stream_parallelism_inner(
2437+
Some(NonZeroUsize::new(8).unwrap()),
2438+
NonZeroUsize::new(4).unwrap(),
2439+
Some(NonZeroUsize::new(10).unwrap()),
2440+
&DefaultParallelism::Full,
2441+
"default",
2442+
);
2443+
assert!(matches!(
2444+
result,
2445+
Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2446+
));
2447+
}
2448+
}

src/tests/simulation/tests/integration_tests/scale/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod nexmark_chaos;
2323
mod nexmark_q4;
2424
mod nexmark_source;
2525
mod no_shuffle;
26+
mod parallelism_exceeds_cores;
2627
mod resource_group;
2728
mod schedulability;
2829
mod shared_source;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use anyhow::Result;
16+
use risingwave_common::hash::VirtualNode;
17+
use risingwave_simulation::cluster::{Cluster, Configuration};
18+
use risingwave_simulation::ctl_ext::predicate::identity_contains;
19+
20+
#[tokio::test]
21+
async fn test_fragment_parallelism_can_exceed_physical_cores() -> Result<()> {
22+
let configuration = Configuration::for_scale();
23+
let total_cores = configuration.total_streaming_cores() as usize;
24+
let mut cluster = Cluster::start(configuration).await?;
25+
let max_parallelism = VirtualNode::COUNT_FOR_COMPAT;
26+
27+
// Expect creating a job with parallelism larger than max_parallelism to be rejected.
28+
let mut session = cluster.start_session();
29+
30+
session
31+
.run(format!(
32+
"set streaming_parallelism = {}",
33+
max_parallelism + 1
34+
))
35+
.await?;
36+
37+
assert!(session.run("create table t (v int);").await.is_err());
38+
39+
let new_parallelism = total_cores + 1;
40+
41+
session
42+
.run(format!("set streaming_parallelism = {new_parallelism}"))
43+
.await?;
44+
45+
// With a value within max_parallelism, creation succeeds; we will bump further afterward.
46+
session.run("create table t (v int);").await?;
47+
48+
let fragment = cluster
49+
.locate_one_fragment([identity_contains("materialize")])
50+
.await?;
51+
let fragment_id = fragment.id();
52+
assert_eq!(fragment.inner.actors.len(), new_parallelism);
53+
54+
let new_parallelism = total_cores + 2;
55+
56+
// Raise table parallelism beyond physical cores; fragment actor count should follow.
57+
session
58+
.run(&format!(
59+
"alter table t set parallelism = {new_parallelism};"
60+
))
61+
.await?;
62+
63+
let updated_fragment = cluster
64+
.locate_one_fragment([identity_contains("materialize")])
65+
.await?;
66+
assert_eq!(updated_fragment.inner.actors.len(), new_parallelism);
67+
68+
let new_parallelism = total_cores + 3;
69+
70+
// Alter fragment directly to an even higher parallelism and verify actor count.
71+
session
72+
.run(&format!(
73+
"alter fragment {fragment_id} set parallelism = {new_parallelism};"
74+
))
75+
.await?;
76+
77+
let updated_fragment = cluster
78+
.locate_one_fragment([identity_contains("materialize")])
79+
.await?;
80+
assert_eq!(updated_fragment.inner.actors.len(), new_parallelism);
81+
82+
Ok(())
83+
}

0 commit comments

Comments
 (0)