diff --git a/crates/transpiler/src/passes/consolidate_blocks.rs b/crates/transpiler/src/passes/consolidate_blocks.rs index 67b9e461ef26..e871a6097683 100644 --- a/crates/transpiler/src/passes/consolidate_blocks.rs +++ b/crates/transpiler/src/passes/consolidate_blocks.rs @@ -10,6 +10,11 @@ // copyright notice, and modified files need to carry a notice indicating // that they have been altered from the originals. +use std::cell::RefCell; + +use rayon::prelude::*; +use thread_local::ThreadLocal; + use super::optimize_1q_gates_decomposition::matmul_1q; use hashbrown::{HashMap, HashSet}; use nalgebra::{Matrix2, Matrix4, U4}; @@ -46,6 +51,7 @@ use smallvec::SmallVec; use crate::passes::unitary_synthesis::{PARAM_SET, TWO_QUBIT_BASIS_SET}; use crate::target::{Qargs, Target}; use qiskit_circuit::PhysicalQubit; +use qiskit_util::getenv_use_multiple_threads; static IDENTITY_2Q: Matrix4 = Matrix4::new( // Row 1 @@ -219,10 +225,252 @@ impl PhysQargsMap { } } +enum ConsolidateResult { + Identity, + NoConsolidate, + Matrix(HashMap, UnitaryGate), + Replace(UnitaryGate), +} + +#[allow(clippy::too_many_arguments)] +fn should_substitute( + dag: &DAGCircuit, + decomposer: Option<&DecomposerType>, + target: Option<&Target>, + basis_gates: Option<&HashSet>, + basis_gate_name: &str, + block: &[NodeIndex], + block_qargs: &mut HashSet, + phys_qargs: &mut PhysQargsMap, + force_consolidate: bool, +) -> PyResult { + block_qargs.clear(); + if block.len() == 1 { + let inst_node = block[0]; + let inst = dag[inst_node].unwrap_operation(); + if !is_supported( + target, + basis_gates, + inst.op.name(), + phys_qargs.get(dag, inst.qubits), + ) { + let num_qubits = inst.op.num_qubits(); + let unitary_gate = if num_qubits == 1 { + let matrix = match get_1q_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::OneQ(matrix), + } + } else if num_qubits == 2 { + let matrix = match get_2q_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::TwoQ(matrix), + } + } else { + let matrix = match get_matrix_from_inst(inst) { + Ok(mat) => mat, + Err(_) => return Ok(ConsolidateResult::NoConsolidate), + }; + UnitaryGate { + array: ArrayType::NDArray(matrix), + } + }; + return Ok(ConsolidateResult::Replace(unitary_gate)); + } + } + let mut basis_count: usize = 0; + let mut outside_basis = false; + for node in block { + let inst = dag[*node].unwrap_operation(); + block_qargs.extend(dag.get_qargs(inst.qubits)); + if inst.op.name() == basis_gate_name { + basis_count += 1; + } + if !is_supported( + target, + basis_gates, + inst.op.name(), + phys_qargs.get(dag, inst.qubits), + ) { + outside_basis = true; + } + } + if block_qargs.len() > 2 { + let mut qargs: Vec = block_qargs.iter().copied().collect(); + qargs.sort(); + let block_index_map: HashMap = qargs + .into_iter() + .enumerate() + .map(|(idx, qubit)| (qubit, idx)) + .collect(); + let circuit_data = CircuitData::from_packed_operations( + block_qargs.len() as u32, + 0, + block.iter().map(|node| { + let inst = dag[*node].unwrap_operation(); + + Ok(( + inst.op.clone(), + inst.params_view().iter().cloned().collect(), + dag.get_qargs(inst.qubits) + .iter() + .map(|x| Qubit::new(block_index_map[x])) + .collect(), + vec![], + )) + }), + Param::Float(0.), + )?; + let matrix = Python::attach(|py| -> PyResult<_> { + let circuit = circuit_data.into_py_quantum_circuit(py)?; + let matrix = QI_OPERATOR + .get_bound(py) + .call1((circuit,))? + .getattr(intern!(py, "data"))? + .extract::>()? + .as_array() + .to_owned(); + Ok(matrix) + })?; + let identity: Array2 = Array2::eye(2usize.pow(block_qargs.len() as u32)); + if approx::abs_diff_eq!(identity, matrix.view()) { + return Ok(ConsolidateResult::Identity); + } else { + let unitary_gate = UnitaryGate { + array: ArrayType::NDArray(matrix), + }; + return Ok(ConsolidateResult::Matrix(block_index_map, unitary_gate)); + } + } else { + let block_index_map = [ + *block_qargs.iter().min().unwrap(), + *block_qargs.iter().max().unwrap(), + ]; + let matrix = blocks_to_matrix(dag, block, block_index_map).ok(); + if let Some(matrix) = matrix { + let consolidate = if force_consolidate + || block.len() > MAX_2Q_DEPTH + || (basis_gates.is_some() && outside_basis) + || (target.is_some() && outside_basis) + { + true + } else { + let num_basis_gates = if let Some(ref decomposer) = decomposer { + match decomposer { + DecomposerType::TwoQubitBasis(decomp) => decomp.num_basis_gates_inner( + nalgebra_array_view::(matrix.as_view()), + )?, + DecomposerType::TwoQubitControlledU(decomp) => decomp + .num_basis_gates_inner(nalgebra_array_view::( + matrix.as_view(), + ))?, + } + } else { + unreachable!("A decomposer is always set unless force_consolidate is true"); + }; + num_basis_gates < basis_count + }; + if consolidate { + if approx::abs_diff_eq!(IDENTITY_2Q, matrix) { + return Ok(ConsolidateResult::Identity); + } else { + let unitary_gate = UnitaryGate { + array: ArrayType::TwoQ(matrix), + }; + let qubit_pos_map = block_index_map + .into_iter() + .enumerate() + .map(|(idx, qubit)| (qubit, idx)) + .collect(); + return Ok(ConsolidateResult::Matrix(qubit_pos_map, unitary_gate)); + } + } + } + } + Ok(ConsolidateResult::NoConsolidate) +} + +#[allow(clippy::too_many_arguments)] +fn consolidation_analysis_parallel<'a>( + dag: &DAGCircuit, + decomposer: Option, + target: Option<&Target>, + basis_gates: Option<&HashSet>, + basis_gate_name: &str, + blocks: &'a [Vec], + qubit_map: Option>, + force_consolidate: bool, +) -> PyResult> { + let block_qargs = ThreadLocal::new(); + let phys_qargs = ThreadLocal::new(); + blocks + .par_iter() + .map(|block| { + let block_qargs = block_qargs.get_or(|| RefCell::new(HashSet::with_capacity(2))); + let phys_qargs = + phys_qargs.get_or(|| RefCell::new(PhysQargsMap::new(qubit_map.clone()))); + let consolidate_result = should_substitute( + dag, + decomposer.as_ref(), + target, + basis_gates, + basis_gate_name, + block, + &mut block_qargs.borrow_mut(), + &mut phys_qargs.borrow_mut(), + force_consolidate, + )?; + Ok((block.as_slice(), consolidate_result)) + }) + .collect() +} + +fn apply_consolidation( + dag: &mut DAGCircuit, + block: &[NodeIndex], + consolidation: ConsolidateResult, +) -> PyResult<()> { + match consolidation { + ConsolidateResult::NoConsolidate => {} + ConsolidateResult::Identity => { + for index in block { + dag.remove_op_node(*index); + } + } + ConsolidateResult::Matrix(block_index_map, unitary_gate) => { + let clbit_pos_map = HashMap::new(); + dag.replace_block( + block, + PackedOperation::from_unitary(Box::new(unitary_gate)), + None, + None, + false, + &block_index_map, + &clbit_pos_map, + )?; + } + ConsolidateResult::Replace(unitary_gate) => { + dag.substitute_op( + block[0], + PackedOperation::from_unitary(Box::new(unitary_gate)), + None, + None, + )?; + } + } + Ok(()) +} + #[allow(clippy::too_many_arguments)] #[pyfunction] #[pyo3(name = "consolidate_blocks", signature = (dag, decomposer, basis_gate_name, force_consolidate, target=None, basis_gates=None, blocks=None, runs=None, qubit_map=None))] fn py_run_consolidate_blocks( + py: Python, dag: &mut DAGCircuit, decomposer: Option, basis_gate_name: &str, @@ -280,199 +528,45 @@ fn py_run_consolidate_blocks( .collect::, _>>() }) .transpose()?; - let mut all_block_gates: HashSet = - HashSet::with_capacity(blocks.iter().map(|x| x.len()).sum()); - // In most cases, the qargs in a block will not exceed 2 qubits. - let mut block_qargs: HashSet = HashSet::with_capacity(2); - let mut phys_qargs = PhysQargsMap::new(qubit_map); - for block in blocks { - block_qargs.clear(); - if block.len() == 1 { - let inst_node = block[0]; - let inst = dag[inst_node].unwrap_operation(); - if !is_supported( + let run_in_parallel = getenv_use_multiple_threads(); + if run_in_parallel && blocks.len() > 50 { + let consolidations = py.detach(|| { + consolidation_analysis_parallel( + dag, + decomposer, target, basis_gates.as_ref(), - inst.op.name(), - phys_qargs.get(dag, inst.qubits), - ) { - all_block_gates.insert(inst_node); - let num_qubits = inst.op.num_qubits(); - let unitary_gate = if num_qubits == 1 { - let matrix = match get_1q_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::OneQ(matrix), - } - } else if num_qubits == 2 { - let matrix = match get_2q_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::TwoQ(matrix), - } - } else { - let matrix = match get_matrix_from_inst(inst) { - Ok(mat) => mat, - Err(_) => continue, - }; - UnitaryGate { - array: ArrayType::NDArray(matrix), - } - }; - dag.substitute_op( - inst_node, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - )?; - continue; - } + basis_gate_name, + &blocks, + qubit_map.clone(), + force_consolidate, + ) + })?; + for (block, result) in consolidations { + apply_consolidation(dag, block, result)?; } - let mut basis_count: usize = 0; - let mut outside_basis = false; - for node in &block { - let inst = dag[*node].unwrap_operation(); - block_qargs.extend(dag.get_qargs(inst.qubits)); - all_block_gates.insert(*node); - if inst.op.name() == basis_gate_name { - basis_count += 1; - } - if !is_supported( + } else { + // In most cases, the qargs in a block will not exceed 2 qubits. + let mut block_qargs: HashSet = HashSet::with_capacity(2); + let mut phys_qargs = PhysQargsMap::new(qubit_map.clone()); + for block in &blocks { + let result = should_substitute( + dag, + decomposer.as_ref(), target, basis_gates.as_ref(), - inst.op.name(), - phys_qargs.get(dag, inst.qubits), - ) { - outside_basis = true; - } - } - if block_qargs.len() > 2 { - let mut qargs: Vec = block_qargs.iter().copied().collect(); - qargs.sort(); - let block_index_map: HashMap = qargs - .into_iter() - .enumerate() - .map(|(idx, qubit)| (qubit, idx)) - .collect(); - let circuit_data = CircuitData::from_packed_operations( - block_qargs.len() as u32, - 0, - block.iter().map(|node| { - let inst = dag[*node].unwrap_operation(); - - Ok(( - inst.op.clone(), - inst.params_view().iter().cloned().collect(), - dag.get_qargs(inst.qubits) - .iter() - .map(|x| Qubit::new(block_index_map[x])) - .collect(), - vec![], - )) - }), - Param::Float(0.), + basis_gate_name, + block, + &mut block_qargs, + &mut phys_qargs, + force_consolidate, )?; - let matrix = Python::attach(|py| -> PyResult<_> { - let circuit = circuit_data.into_py_quantum_circuit(py)?; - let matrix = QI_OPERATOR - .get_bound(py) - .call1((circuit,))? - .getattr(intern!(py, "data"))? - .extract::>()? - .as_array() - .to_owned(); - Ok(matrix) - })?; - let identity: Array2 = Array2::eye(2usize.pow(block_qargs.len() as u32)); - if approx::abs_diff_eq!(identity, matrix.view()) { - for node in block { - dag.remove_op_node(node); - } - } else { - let unitary_gate = UnitaryGate { - array: ArrayType::NDArray(matrix), - }; - let clbit_pos_map = HashMap::new(); - dag.replace_block( - &block, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - false, - &block_index_map, - &clbit_pos_map, - )?; - } - } else { - let block_index_map = [ - *block_qargs.iter().min().unwrap(), - *block_qargs.iter().max().unwrap(), - ]; - let matrix = blocks_to_matrix(dag, &block, block_index_map).ok(); - if let Some(matrix) = matrix { - let consolidate = if force_consolidate - || block.len() > MAX_2Q_DEPTH - || (basis_gates.is_some() && outside_basis) - || (target.is_some() && outside_basis) - { - true - } else { - let num_basis_gates = if let Some(ref decomposer) = decomposer { - match decomposer { - DecomposerType::TwoQubitBasis(decomp) => { - decomp.num_basis_gates_inner(nalgebra_array_view::< - Complex64, - U4, - U4, - >( - matrix.as_view() - ))? - } - DecomposerType::TwoQubitControlledU(decomp) => decomp - .num_basis_gates_inner(nalgebra_array_view::( - matrix.as_view(), - ))?, - } - } else { - unreachable!("A decomposer is always set unless force_consolidate is true"); - }; - num_basis_gates < basis_count - }; - - if consolidate { - if approx::abs_diff_eq!(IDENTITY_2Q, matrix) { - for node in block { - dag.remove_op_node(node); - } - } else { - let unitary_gate = UnitaryGate { - array: ArrayType::TwoQ(matrix), - }; - let qubit_pos_map = block_index_map - .into_iter() - .enumerate() - .map(|(idx, qubit)| (qubit, idx)) - .collect(); - let clbit_pos_map = HashMap::new(); - dag.replace_block( - &block, - PackedOperation::from_unitary(Box::new(unitary_gate)), - None, - None, - false, - &qubit_pos_map, - &clbit_pos_map, - )?; - } - } - } + apply_consolidation(dag, block, result)?; } } if let Some(runs) = runs { + let all_block_gates: HashSet = blocks.iter().flatten().copied().collect(); + let mut phys_qargs = PhysQargsMap::new(qubit_map); for run in runs { if run.iter().any(|node| all_block_gates.contains(node)) { continue; @@ -579,34 +673,50 @@ pub fn run_consolidate_blocks( target: Option<&Target>, ) -> PyResult<()> { let approximation_degree = approximation_degree.unwrap_or(1.0); - if force_consolidate { - py_run_consolidate_blocks( + let (decomposer, basis_gate): (Option, Option) = + if force_consolidate { + (None, None) + } else { + let (decomposer, basis_gate) = + get_decomposer_and_basis_gate(target, approximation_degree); + (Some(decomposer), Some(basis_gate)) + }; + let run_in_parallel = getenv_use_multiple_threads(); + let blocks = dag.collect_2q_runs().unwrap(); + if run_in_parallel { + let consolidations = consolidation_analysis_parallel( dag, - None, - "cx", - force_consolidate, + decomposer, target, None, + basis_gate.as_ref().map(|x| x.name()).unwrap_or("cx"), + &blocks, None, - None, - // TODO: this doesn't handle the possibility of control-flow operations yet. - None, - ) - } else { - let (decomposer, basis_gate) = get_decomposer_and_basis_gate(target, approximation_degree); - py_run_consolidate_blocks( - dag, - Some(decomposer), - basis_gate.name(), force_consolidate, - target, - None, - None, - None, - // TODO: this doesn't handle the possibility of control-flow operations yet. - None, - ) + )?; + for (block, result) in consolidations { + apply_consolidation(dag, block, result)?; + } + } else { + // In most cases, the qargs in a block will not exceed 2 qubits. + let mut block_qargs: HashSet = HashSet::with_capacity(2); + let mut phys_qargs = PhysQargsMap::new(None); + for block in &blocks { + let result = should_substitute( + dag, + decomposer.as_ref(), + target, + None, + basis_gate.as_ref().map(|x| x.name()).unwrap_or("cx"), + block, + &mut block_qargs, + &mut phys_qargs, + force_consolidate, + )?; + apply_consolidation(dag, block, result)?; + } } + Ok(()) } pub fn consolidate_blocks_mod(m: &Bound) -> PyResult<()> { diff --git a/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml b/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml new file mode 100644 index 000000000000..0d2258b40412 --- /dev/null +++ b/releasenotes/notes/consolidateblocks-is-parallel-in-rust-with-rayon-multithreading-f612d28d7135dcad.yaml @@ -0,0 +1,6 @@ +--- +performance: + - The :class:`.ConsolidateBlocks` transpiler pass, and its + C API function :c:func:`qk_transpiler_pass_standalone_consolidate_blocks`, + is now multithreaded which results in improved runtime performance. See + `#16230 `__ for more details.