From 465137b2c85881a34e8c1d7be43551b8438aa858 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Thu, 21 May 2026 12:44:50 -0400 Subject: [PATCH 1/2] Perform ConsolidateBlocks analysis in parallel This commit updates the ConsolidateBlocks transpiler pass to internally be multithreaded. It will run the analysis of blocks in parallel and make the determination on whether to do a consolidation or not and collect that result into an intermediate vector. Then serially we'll iterate over the vector and update the dag based on the result of the analysis. This speeds up the runtime performance of the pass especially as the analysis is the more computationally expensive part of the pass. This change is still relevant even after #16136 merges as the new TwoQubitPeepholeOptimization pass is only being used to replace the use of ConsolidateBlocks in the optimization loop. We still run the pass as part of the init stage by default and this will speed up that execution. --- .../src/passes/consolidate_blocks.rs | 522 +++++++++++------- 1 file changed, 316 insertions(+), 206 deletions(-) diff --git a/crates/transpiler/src/passes/consolidate_blocks.rs b/crates/transpiler/src/passes/consolidate_blocks.rs index 67b9e461ef26..e871a6097683 100644 --- a/crates/transpiler/src/passes/consolidate_blocks.rs +++ b/crates/transpiler/src/passes/consolidate_blocks.rs @@ -10,6 +10,11 @@ // copyright notice, and modified files need to carry a notice indicating // that they have been altered from the originals. +use std::cell::RefCell; + +use rayon::prelude::*; +use thread_local::ThreadLocal; + use super::optimize_1q_gates_decomposition::matmul_1q; use hashbrown::{HashMap, HashSet}; use nalgebra::{Matrix2, Matrix4, U4}; @@ -46,6 +51,7 @@ use smallvec::SmallVec; use crate::passes::unitary_synthesis::{PARAM_SET, TWO_QUBIT_BASIS_SET}; use crate::target::{Qargs, Target}; use qiskit_circuit::PhysicalQubit; +use qiskit_util::getenv_use_multiple_threads; static IDENTITY_2Q: Matrix4 = Matrix4::new( // Row 1 @@ -219,10 +225,252 @@ impl PhysQargsMap { } } +enum ConsolidateResult { + Identity, + NoConsolidate, + Matrix(HashMap, UnitaryGate), + Replace(UnitaryGate), +} + +#[allow(clippy::too_many_arguments)] +fn should_substitute( + dag: &DAGCircuit, + decomposer: Option<&DecomposerType>, + target: Option<&Target>, + basis_gates: Option<&HashSet>, + basis_gate_name: &str, + block: &[NodeIndex], + block_qargs: &mut HashSet, + phys_qargs: &mut PhysQargsMap, + force_consolidate: bool, +) -> PyResult { + block_qargs.clear(); + if block.len() == 1 { + let inst_node = block[0]; + let inst = dag[inst_node].unwrap_operation(); + if !is_supported( + target, + basis_gates, + inst.op.name(), + phys_qargs.get(dag, inst.qubits), + ) { + let num_qubits = inst.op.num_qubits(); + let unitary_gate = if num_qubits == 1 { + let matrix = match get_1q_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::OneQ(matrix), + } + } else if num_qubits == 2 { + let matrix = match get_2q_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::TwoQ(matrix), + } + } else { + let matrix = match get_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::NDArray(matrix), + } + }; + return Ok(ConsolidateResult::Replace(unitary_gate)); + } + } + let mut basis_count: usize = 0; + let mut outside_basis = false; + for node in block { + let inst = dag[*node].unwrap_operation(); + block_qargs.extend(dag.get_qargs(inst.qubits)); + if inst.op.name() == basis_gate_name { + basis_count += 1; + } + if !is_supported( + target, + basis_gates, + inst.op.name(), + phys_qargs.get(dag, inst.qubits), + ) { + outside_basis = true; + } + } + if block_qargs.len() > 2 { + let mut qargs: Vec = block_qargs.iter().copied().collect(); + qargs.sort(); + let block_index_map: HashMap = qargs + .into_iter() + .enumerate() + .map(|(idx, qubit)| (qubit, idx)) + .collect(); + let circuit_data = CircuitData::from_packed_operations( + block_qargs.len() as u32, + 0, + block.iter().map(|node| { + let inst = dag[*node].unwrap_operation(); + + Ok(( + inst.op.clone(), + inst.params_view().iter().cloned().collect(), + dag.get_qargs(inst.qubits) + .iter() + .map(|x| Qubit::new(block_index_map[x])) + .collect(), + vec![], + )) + }), + Param::Float(0.), + )?; + let matrix = Python::attach(|py| -> PyResult<_> { + let circuit = circuit_data.into_py_quantum_circuit(py)?; + let matrix = QI_OPERATOR + .get_bound(py) + .call1((circuit,))? + .getattr(intern!(py, "data"))? + .extract::>()? + .as_array() + .to_owned(); + Ok(matrix) + })?; + let identity: Array2 = Array2::eye(2usize.pow(block_qargs.len() as u32)); + if approx::abs_diff_eq!(identity, matrix.view()) { + return Ok(ConsolidateResult::Identity); + } else { + let unitary_gate = UnitaryGate { + array: ArrayType::NDArray(matrix), + }; + return Ok(ConsolidateResult::Matrix(block_index_map, unitary_gate)); + } + } else { + let block_index_map = [ + *block_qargs.iter().min().unwrap(), + *block_qargs.iter().max().unwrap(), + ]; + let matrix = blocks_to_matrix(dag, block, block_index_map).ok(); + if let Some(matrix) = matrix { + let consolidate = if force_consolidate + || block.len() > MAX_2Q_DEPTH + || (basis_gates.is_some() && outside_basis) + || (target.is_some() && outside_basis) + { + true + } else { + let num_basis_gates = if let Some(ref decomposer) = decomposer { + match decomposer { + DecomposerType::TwoQubitBasis(decomp) => decomp.num_basis_gates_inner( + nalgebra_array_view::(matrix.as_view()), + )?, + DecomposerType::TwoQubitControlledU(decomp) => decomp + .num_basis_gates_inner(nalgebra_array_view::( + matrix.as_view(), + ))?, + } + } else { + unreachable!("A decomposer is always set unless force_consolidate is true"); + }; + num_basis_gates < basis_count + }; + if consolidate { + if approx::abs_diff_eq!(IDENTITY_2Q, matrix) { + return Ok(ConsolidateResult::Identity); + } else { + let unitary_gate = UnitaryGate { + array: ArrayType::TwoQ(matrix), + }; + let qubit_pos_map = block_index_map + .into_iter() + .enumerate() + .map(|(idx, qubit)| (qubit, idx)) + .collect(); + return Ok(ConsolidateResult::Matrix(qubit_pos_map, unitary_gate)); + } + } + } + } + Ok(ConsolidateResult::NoConsolidate) +} + +#[allow(clippy::too_many_arguments)] +fn consolidation_analysis_parallel<'a>( + dag: &DAGCircuit, + decomposer: Option, + target: Option<&Target>, + basis_gates: Option<&HashSet>, + basis_gate_name: &str, + blocks: &'a [Vec], + qubit_map: Option>, + force_consolidate: bool, +) -> PyResult> { + let block_qargs = ThreadLocal::new(); + let phys_qargs = ThreadLocal::new(); + blocks + .par_iter() + .map(|block| { + let block_qargs = block_qargs.get_or(|| RefCell::new(HashSet::with_capacity(2))); + let phys_qargs = + phys_qargs.get_or(|| RefCell::new(PhysQargsMap::new(qubit_map.clone()))); + let consolidate_result = should_substitute( + dag, + decomposer.as_ref(), + target, + basis_gates, + basis_gate_name, + block, + &mut block_qargs.borrow_mut(), + &mut phys_qargs.borrow_mut(), + force_consolidate, + )?; + Ok((block.as_slice(), consolidate_result)) + }) + .collect() +} + +fn apply_consolidation( + dag: &mut DAGCircuit, + block: &[NodeIndex], + consolidation: ConsolidateResult, +) -> PyResult<()> { + match consolidation { + ConsolidateResult::NoConsolidate => {} + ConsolidateResult::Identity => { + for index in block { + dag.remove_op_node(*index); + } + } + ConsolidateResult::Matrix(block_index_map, unitary_gate) => { + let clbit_pos_map = HashMap::new(); + dag.replace_block( + block, + PackedOperation::from_unitary(Box::new(unitary_gate)), + None, + None, + false, + &block_index_map, + &clbit_pos_map, + )?; + } + ConsolidateResult::Replace(unitary_gate) => { + dag.substitute_op( + block[0], + PackedOperation::from_unitary(Box::new(unitary_gate)), + None, + None, + )?; + } + } + Ok(()) +} + #[allow(clippy::too_many_arguments)] #[pyfunction] #[pyo3(name = "consolidate_blocks", signature = (dag, decomposer, basis_gate_name, force_consolidate, target=None, basis_gates=None, blocks=None, runs=None, qubit_map=None))] fn py_run_consolidate_blocks( + py: Python, dag: &mut DAGCircuit, decomposer: Option, basis_gate_name: &str, @@ -280,199 +528,45 @@ fn py_run_consolidate_blocks( .collect::, _>>() }) .transpose()?; - let mut all_block_gates: HashSet = - HashSet::with_capacity(blocks.iter().map(|x| x.len()).sum()); - // In most cases, the qargs in a block will not exceed 2 qubits. - let mut block_qargs: HashSet = HashSet::with_capacity(2); - let mut phys_qargs = PhysQargsMap::new(qubit_map); - for block in blocks { - block_qargs.clear(); - if block.len() == 1 { - let inst_node = block[0]; - let inst = dag[inst_node].unwrap_operation(); - if !is_supported( + let run_in_parallel = getenv_use_multiple_threads(); + if run_in_parallel && blocks.len() > 50 { + let consolidations = py.detach(|| { + consolidation_analysis_parallel( + dag, + decomposer, target, basis_gates.as_ref(), - inst.op.name(), - phys_qargs.get(dag, inst.qubits), - ) { - all_block_gates.insert(inst_node); - let num_qubits = inst.op.num_qubits(); - let unitary_gate = if num_qubits == 1 { - let matrix = match get_1q_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::OneQ(matrix), - } - } else if num_qubits == 2 { - let matrix = match get_2q_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::TwoQ(matrix), - } - } else { - let matrix = match get_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::NDArray(matrix), - } - }; - dag.substitute_op( - inst_node, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - )?; - continue; - } + basis_gate_name, + &blocks, + qubit_map.clone(), + force_consolidate, + ) + })?; + for (block, result) in consolidations { + apply_consolidation(dag, block, result)?; } - let mut basis_count: usize = 0; - let mut outside_basis = false; - for node in &block { - let inst = dag[*node].unwrap_operation(); - block_qargs.extend(dag.get_qargs(inst.qubits)); - all_block_gates.insert(*node); - if inst.op.name() == basis_gate_name { - basis_count += 1; - } - if !is_supported( + } else { + // In most cases, the qargs in a block will not exceed 2 qubits. + let mut block_qargs: HashSet = HashSet::with_capacity(2); + let mut phys_qargs = PhysQargsMap::new(qubit_map.clone()); + for block in &blocks { + let result = should_substitute( + dag, + decomposer.as_ref(), target, basis_gates.as_ref(), - inst.op.name(), - phys_qargs.get(dag, inst.qubits), - ) { - outside_basis = true; - } - } - if block_qargs.len() > 2 { - let mut qargs: Vec = block_qargs.iter().copied().collect(); - qargs.sort(); - let block_index_map: HashMap = qargs - .into_iter() - .enumerate() - .map(|(idx, qubit)| (qubit, idx)) - .collect(); - let circuit_data = CircuitData::from_packed_operations( - block_qargs.len() as u32, - 0, - block.iter().map(|node| { - let inst = dag[*node].unwrap_operation(); - - Ok(( - inst.op.clone(), - inst.params_view().iter().cloned().collect(), - dag.get_qargs(inst.qubits) - .iter() - .map(|x| Qubit::new(block_index_map[x])) - .collect(), - vec![], - )) - }), - Param::Float(0.), + basis_gate_name, + block, + &mut block_qargs, + &mut phys_qargs, + force_consolidate, )?; - let matrix = Python::attach(|py| -> PyResult<_> { - let circuit = circuit_data.into_py_quantum_circuit(py)?; - let matrix = QI_OPERATOR - .get_bound(py) - .call1((circuit,))? - .getattr(intern!(py, "data"))? - .extract::>()? - .as_array() - .to_owned(); - Ok(matrix) - })?; - let identity: Array2 = Array2::eye(2usize.pow(block_qargs.len() as u32)); - if approx::abs_diff_eq!(identity, matrix.view()) { - for node in block { - dag.remove_op_node(node); - } - } else { - let unitary_gate = UnitaryGate { - array: ArrayType::NDArray(matrix), - }; - let clbit_pos_map = HashMap::new(); - dag.replace_block( - &block, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - false, - &block_index_map, - &clbit_pos_map, - )?; - } - } else { - let block_index_map = [ - *block_qargs.iter().min().unwrap(), - *block_qargs.iter().max().unwrap(), - ]; - let matrix = blocks_to_matrix(dag, &block, block_index_map).ok(); - if let Some(matrix) = matrix { - let consolidate = if force_consolidate - || block.len() > MAX_2Q_DEPTH - || (basis_gates.is_some() && outside_basis) - || (target.is_some() && outside_basis) - { - true - } else { - let num_basis_gates = if let Some(ref decomposer) = decomposer { - match decomposer { - DecomposerType::TwoQubitBasis(decomp) => { - decomp.num_basis_gates_inner(nalgebra_array_view::< - Complex64, - U4, - U4, - >( - matrix.as_view() - ))? - } - DecomposerType::TwoQubitControlledU(decomp) => decomp - .num_basis_gates_inner(nalgebra_array_view::( - matrix.as_view(), - ))?, - } - } else { - unreachable!("A decomposer is always set unless force_consolidate is true"); - }; - num_basis_gates < basis_count - }; - - if consolidate { - if approx::abs_diff_eq!(IDENTITY_2Q, matrix) { - for node in block { - dag.remove_op_node(node); - } - } else { - let unitary_gate = UnitaryGate { - array: ArrayType::TwoQ(matrix), - }; - let qubit_pos_map = block_index_map - .into_iter() - .enumerate() - .map(|(idx, qubit)| (qubit, idx)) - .collect(); - let clbit_pos_map = HashMap::new(); - dag.replace_block( - &block, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - false, - &qubit_pos_map, - &clbit_pos_map, - )?; - } - } - } + apply_consolidation(dag, block, result)?; } } if let Some(runs) = runs { + let all_block_gates: HashSet = blocks.iter().flatten().copied().collect(); + let mut phys_qargs = PhysQargsMap::new(qubit_map); for run in runs { if run.iter().any(|node| all_block_gates.contains(node)) { continue; @@ -579,34 +673,50 @@ pub fn run_consolidate_blocks( target: Option<&Target>, ) -> PyResult<()> { let approximation_degree = approximation_degree.unwrap_or(1.0); - if force_consolidate { - py_run_consolidate_blocks( + let (decomposer, basis_gate): (Option, Option) = + if force_consolidate { + (None, None) + } else { + let (decomposer, basis_gate) = + get_decomposer_and_basis_gate(target, approximation_degree); + (Some(decomposer), Some(basis_gate)) + }; + let run_in_parallel = getenv_use_multiple_threads(); + let blocks = dag.collect_2q_runs().unwrap(); + if run_in_parallel { + let consolidations = consolidation_analysis_parallel( dag, - None, - "cx", - force_consolidate, + decomposer, target, None, + basis_gate.as_ref().map(|x| x.name()).unwrap_or("cx"), + &blocks, None, - None, - // TODO: this doesn't handle the possibility of control-flow operations yet. - None, - ) - } else { - let (decomposer, basis_gate) = get_decomposer_and_basis_gate(target, approximation_degree); - py_run_consolidate_blocks( - dag, - Some(decomposer), - basis_gate.name(), force_consolidate, - target, - None, - None, - None, - // TODO: this doesn't handle the possibility of control-flow operations yet. - None, - ) + )?; + for (block, result) in consolidations { + apply_consolidation(dag, block, result)?; + } + } else { + // In most cases, the qargs in a block will not exceed 2 qubits. + let mut block_qargs: HashSet = HashSet::with_capacity(2); + let mut phys_qargs = PhysQargsMap::new(None); + for block in &blocks { + let result = should_substitute( + dag, + decomposer.as_ref(), + target, + None, + basis_gate.as_ref().map(|x| x.name()).unwrap_or("cx"), + block, + &mut block_qargs, + &mut phys_qargs, + force_consolidate, + )?; + apply_consolidation(dag, block, result)?; + } } + Ok(()) } pub fn consolidate_blocks_mod(m: &Bound) -> PyResult<()> { From 6ea96e157802b1b514cb8e7b9db081cda6571a46 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Wed, 3 Jun 2026 12:00:32 -0400 Subject: [PATCH 2/2] Add release note --- ...-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml diff --git a/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml b/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml new file mode 100644 index 000000000000..0d2258b40412 --- /dev/null +++ b/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml @@ -0,0 +1,6 @@ +--- +performance: + - The :class:`.ConsolidateBlocks` transpiler pass, and its + C API function :c:func:`qk_transpiler_pass_standalone_consolidate_blocks`, + is now multithreaded which results in improved runtime performance. See + `#16230 `__ for more details.