Skip to content

Commit ac670fe

Browse files
committed
Add array batching for Qiskit parallel execution (Phase 2)
Replace single-circuit-per-partition model with array batching: each partition holds multiple circuits, ParallelExperiment zips them into composite circuits submitted as one job. Eliminates need for separate job submissions when circuits exceed partitions. Key changes: - _CircuitArrayExperiment: returns N circuits per partition - _assign_to_partitions(): round-robin distribution with reorder map - No longer crashes when circuits > partitions (array handles overflow) - Pre-transpile fallback updated for partition arrays - Updated PLAN_execute_parallel_batching.md with phase status
1 parent a4e677a commit ac670fe

1 file changed

Lines changed: 140 additions & 109 deletions

File tree

qedclib/qiskit/execute_parallel.py

Lines changed: 140 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,42 @@ def _grow(start, size):
219219
return selected
220220

221221

222+
def _assign_to_partitions(circuits, num_partitions):
223+
"""
224+
Distribute circuits round-robin across partitions.
225+
226+
Returns:
227+
partition_arrays: list of lists — partition_arrays[p] contains the
228+
circuits assigned to partition p.
229+
assignment_map: list of lists — assignment_map[p][i] is the original
230+
index (in the input circuits list) of the i-th circuit in partition p.
231+
"""
232+
partition_arrays = [[] for _ in range(num_partitions)]
233+
assignment_map = [[] for _ in range(num_partitions)]
234+
for orig_idx, circ in enumerate(circuits):
235+
p = orig_idx % num_partitions
236+
assignment_map[p].append(orig_idx)
237+
partition_arrays[p].append(circ)
238+
return partition_arrays, assignment_map
239+
240+
222241
def _run_qiskit_parallel_experiment(circuits, num_shots):
223242
"""
224-
Execute a batch of circuits using Qiskit's ParallelExperiment with simple
225-
sequential qubit allocation. Qiskit's transpiler handles layout/routing.
243+
Execute circuits using Qiskit's ParallelExperiment with array batching.
226244
227-
Each circuit is assigned a disjoint range of physical qubits:
228-
circuit 0 → qubits [0, w0)
229-
circuit 1 → qubits [w0 + spacing, w0 + spacing + w1)
230-
...
245+
Circuits are distributed round-robin across available hardware partitions.
246+
Each partition holds an array of circuits, and ParallelExperiment composes
247+
one wide circuit per "round" (zipping the i-th circuit from each partition).
248+
All rounds are submitted as a single job.
231249
232-
The returned value is a list of count dictionaries, one per input circuit.
250+
Example: 12 circuits, 3 partitions →
251+
Partition A: [c0, c3, c6, c9]
252+
Partition B: [c1, c4, c7, c10]
253+
Partition C: [c2, c5, c8, c11]
254+
→ 4 composite circuits, 1 job submission, 12 results
255+
256+
The returned value is a list of count dictionaries, one per input circuit,
257+
in the original input order.
233258
"""
234259
import time
235260
from qiskit_experiments.framework import ParallelExperiment, BaseExperiment, BaseAnalysis
@@ -265,104 +290,102 @@ def _run_qiskit_parallel_experiment(circuits, num_shots):
265290

266291
# For same-width circuits, use topology partitioning if coupling map available
267292
widths = [c.num_qubits for c in circuits]
293+
circuit_width = widths[0] # Phase 2 assumes same-width input
268294
backend_target = getattr(run_backend, 'target', None)
269295
partitions = []
270296
alloc_gap = spacing
271-
routing_buffer = 0 # ParallelExperiment requires physical_qubits == circuit size
272-
if coupling_map is not None and len(set(widths)) == 1:
273-
# Try with decreasing gap until we find enough partitions
297+
298+
if coupling_map is not None:
299+
# Hardware: topology-aware partitioning with gap retry.
300+
# We no longer require partitions >= circuits — array batching
301+
# distributes circuits across however many partitions we find.
274302
for try_gap in [spacing, 1, 0]:
275303
partitions = _find_topology_partitions(
276-
coupling_map, widths[0], len(circuits), gap=try_gap,
304+
coupling_map, circuit_width, len(circuits), gap=try_gap,
277305
backend_target=backend_target,
278-
routing_buffer=routing_buffer
306+
routing_buffer=0
279307
)
280308
alloc_gap = try_gap
281309
if len(partitions) >= len(circuits):
282310
break
283311
if try_gap > 0:
284312
print(f"... topology with gap={try_gap} found {len(partitions)} of "
285313
f"{len(circuits)} needed, retrying with gap={max(try_gap-1, 0)}")
286-
287-
if len(partitions) >= len(circuits):
288-
# Use topology-aware partitions
289-
physical_qubits_per_circuit = partitions[:len(circuits)]
314+
if not partitions:
315+
raise RuntimeError(
316+
f"Cannot find any {circuit_width}-qubit connected subgraphs "
317+
f"on {device_qubits}-qubit device")
290318
alloc_method = f"topology(gap={alloc_gap})"
291-
elif coupling_map is not None and partitions:
292-
# Hardware backend but not enough partitions even at gap=0:
293-
# use what we found and raise to trigger fallback to non-parallel
294-
print(f"... topology partitioning found only {len(partitions)} of "
295-
f"{len(circuits)} needed even at gap=0")
296-
print(f"... running {len(partitions)} circuits in parallel, "
297-
f"remaining {len(circuits) - len(partitions)} will use fallback")
298-
raise RuntimeError(
299-
f"Cannot fit {len(circuits)} circuits: only {len(partitions)} "
300-
f"partitions available on {device_qubits}-qubit device")
301-
elif coupling_map is None:
302-
# Simulator: sequential allocation is safe (all-to-all connectivity)
303-
physical_qubits_per_circuit = []
319+
else:
320+
# Simulator: sequential allocation (all-to-all connectivity)
321+
max_seq = device_qubits // (circuit_width + spacing) if circuit_width + spacing > 0 else 1
322+
num_partitions = min(max_seq, len(circuits))
323+
if num_partitions == 0:
324+
raise RuntimeError(
325+
f"Circuit width {circuit_width} + spacing {spacing} exceeds "
326+
f"device size {device_qubits}")
304327
offset = 0
305-
for circ in circuits:
306-
w = circ.num_qubits
307-
if offset + w > device_qubits:
308-
raise RuntimeError(
309-
f"Circuits do not fit: need {offset + w} qubits, "
310-
f"device has {device_qubits}")
311-
physical_qubits_per_circuit.append(tuple(range(offset, offset + w)))
312-
offset += w + spacing
328+
for _ in range(num_partitions):
329+
partitions.append(tuple(range(offset, offset + circuit_width)))
330+
offset += circuit_width + spacing
313331
alloc_method = "sequential"
314-
else:
315-
# Hardware backend, no partitions found at all
316-
raise RuntimeError(
317-
f"Cannot find any {widths[0]}-qubit connected subgraphs "
318-
f"on {device_qubits}-qubit device")
332+
333+
# Distribute circuits round-robin across the available partitions.
334+
# If we have more circuits than partitions, each partition gets an array
335+
# of circuits — ParallelExperiment zips them into composite circuits.
336+
partition_arrays, assignment_map = _assign_to_partitions(circuits, len(partitions))
319337

320338
t1 = time.time()
321-
qubits_used = max(max(p) for p in physical_qubits_per_circuit) + 1 if physical_qubits_per_circuit else 0
322-
partition_size = len(physical_qubits_per_circuit[0]) if physical_qubits_per_circuit else 0
323-
buf_msg = f", routing_buffer={routing_buffer}" if routing_buffer > 0 and coupling_map is not None else ""
339+
qubits_used = max(max(p) for p in partitions) + 1 if partitions else 0
340+
rounds = max(len(arr) for arr in partition_arrays)
324341
print(f"... [timing] qubit allocation ({alloc_method}): {t1-t0:.3f}s "
325-
f"({len(circuits)} circuits, {partition_size}q partitions{buf_msg}, "
342+
f"({len(circuits)} circuits across {len(partitions)} partitions, "
343+
f"{rounds} rounds, {circuit_width}q each, "
326344
f"{qubits_used} qubits used / {device_qubits})")
327-
if alloc_method.startswith("topology") and len(circuits) <= 6:
328-
for i, p in enumerate(physical_qubits_per_circuit):
329-
print(f"... circuit {i} ({widths[i]}q) → {partition_size}q region {p}")
330-
331-
# Minimal CircuitExperiment wrapper — no analysis needed
345+
if len(partitions) <= 6:
346+
for p_idx, partition in enumerate(partitions):
347+
print(f"... partition {p_idx}: {partition} "
348+
f"({len(partition_arrays[p_idx])} circuits)")
349+
350+
# Minimal experiment wrapper — no analysis needed.
351+
# _CircuitArrayExperiment holds an array of circuits per partition.
352+
# ParallelExperiment zips by index: composite[i] runs the i-th circuit
353+
# from each partition simultaneously. All composites submit as one job.
332354
class _NoAnalysis(BaseAnalysis):
333355
def _run_analysis(self, experiment_data):
334356
return [], []
335357

336-
class _CircuitExperiment(BaseExperiment):
337-
def __init__(self, circuit, physical_qubits, label):
358+
class _CircuitArrayExperiment(BaseExperiment):
359+
def __init__(self, circuits, physical_qubits, label):
338360
super().__init__(
339361
physical_qubits=physical_qubits,
340362
analysis=_NoAnalysis(),
341363
backend=None,
342364
)
343-
# Keep the original circuit as-is, including its measurements.
344-
# No need to remove/re-add measurements — ParallelExperiment
345-
# handles the qubit remapping.
346-
self._circuit = circuit
365+
self._circuits = circuits
347366
self._label = label
348367

349368
def circuits(self):
350-
qc = self._circuit.copy()
351-
qc.name = self._label
352-
qc.metadata = {
353-
"component": self._label,
354-
"physical_qubits": self.physical_qubits,
355-
}
356-
return [qc]
357-
358-
# Build experiments with original circuits.
369+
result = []
370+
for i, circ in enumerate(self._circuits):
371+
qc = circ.copy()
372+
qc.name = f"{self._label}_{i}"
373+
qc.metadata = {
374+
"component": self._label,
375+
"index": i,
376+
"physical_qubits": self.physical_qubits,
377+
}
378+
result.append(qc)
379+
return result
380+
381+
# Build one experiment per partition, each holding its array of circuits.
359382
experiments = [
360-
_CircuitExperiment(
361-
circuit=circuits[i],
362-
physical_qubits=physical_qubits_per_circuit[i],
363-
label=getattr(circuits[i], "name", f"circuit_{i}"),
383+
_CircuitArrayExperiment(
384+
circuits=partition_arrays[p],
385+
physical_qubits=partitions[p],
386+
label=f"partition_{p}",
364387
)
365-
for i in range(len(circuits))
388+
for p in range(len(partitions))
366389
]
367390

368391
parallel = ParallelExperiment(
@@ -391,31 +414,34 @@ def circuits(self):
391414
from qiskit import transpile
392415

393416
full_edges = coupling_map.get_edges() if coupling_map is not None else []
394-
transpiled_circuits = []
395-
for i, (circ, partition) in enumerate(zip(circuits, physical_qubits_per_circuit)):
417+
418+
# Pre-transpile each partition's circuit array onto restricted coupling maps
419+
transpiled_partition_arrays = []
420+
for p, partition in enumerate(partitions):
396421
partition_set = set(partition)
397-
phys_to_local = {p: idx for idx, p in enumerate(partition)}
422+
phys_to_local = {ph: idx for idx, ph in enumerate(partition)}
398423
local_edges = [
399424
(phys_to_local[u], phys_to_local[v])
400425
for u, v in full_edges
401426
if u in partition_set and v in partition_set
402427
]
403428
local_coupling = CouplingMap(local_edges) if local_edges else None
404-
transpiled = transpile(
405-
circ, coupling_map=local_coupling, optimization_level=1
406-
)
407-
transpiled_circuits.append(transpiled)
429+
transpiled_array = [
430+
transpile(circ, coupling_map=local_coupling, optimization_level=1)
431+
for circ in partition_arrays[p]
432+
]
433+
transpiled_partition_arrays.append(transpiled_array)
408434

409435
t_retry = time.time()
410436
print(f"... [timing] pre-transpile onto restricted maps: {t_retry-t2:.3f}s")
411437

412438
experiments = [
413-
_CircuitExperiment(
414-
circuit=transpiled_circuits[i],
415-
physical_qubits=physical_qubits_per_circuit[i],
416-
label=getattr(circuits[i], "name", f"circuit_{i}"),
439+
_CircuitArrayExperiment(
440+
circuits=transpiled_partition_arrays[p],
441+
physical_qubits=partitions[p],
442+
label=f"partition_{p}",
417443
)
418-
for i in range(len(circuits))
444+
for p in range(len(partitions))
419445
]
420446
parallel = ParallelExperiment(
421447
experiments=experiments,
@@ -430,21 +456,27 @@ def circuits(self):
430456
t3 = time.time()
431457
print(f"... [timing] parallel.run + block_for_results: {t3-t2:.1f}s")
432458

433-
# Extract per-circuit counts
434-
counts_list = []
435-
for i, child in enumerate(expdata.child_data()):
436-
datum = child.data(0)
437-
counts = datum.get("counts", datum)
438-
counts_list.append(counts)
439-
# Debug: show raw counts for first few circuits
440-
if i < 3:
441-
sample_keys = list(counts.keys())[:4] if isinstance(counts, dict) else str(type(counts))
442-
print(f"... [debug] child {i}: {len(counts)} entries, "
443-
f"key_len={len(sample_keys[0]) if sample_keys else '?'}, "
444-
f"samples={sample_keys}, "
445-
f"expected_qubits={circuits[i].num_qubits}")
459+
# Extract per-circuit counts and reorder to original circuit order.
460+
# child_data() returns one child per partition; each child has one data
461+
# entry per circuit in that partition's array.
462+
counts_list = [None] * len(circuits)
463+
for partition_idx, child in enumerate(expdata.child_data()):
464+
for circuit_idx in range(len(assignment_map[partition_idx])):
465+
datum = child.data(circuit_idx)
466+
counts = datum.get("counts", datum)
467+
original_idx = assignment_map[partition_idx][circuit_idx]
468+
counts_list[original_idx] = counts
469+
470+
# Debug: show first few results
471+
for i in range(min(3, len(counts_list))):
472+
counts = counts_list[i]
473+
if counts is not None and isinstance(counts, dict):
474+
sample_keys = list(counts.keys())[:4]
475+
print(f"... [debug] circuit {i} ({circuits[i].num_qubits}q): "
476+
f"{len(counts)} entries, samples={sample_keys}")
446477

447-
print(f"... [timing] total _run_qiskit_parallel_experiment: {t3-t0:.1f}s")
478+
print(f"... [timing] total _run_qiskit_parallel_experiment: {t3-t0:.1f}s "
479+
f"({len(circuits)} circuits, {len(partitions)} partitions, {rounds} rounds)")
448480

449481
return counts_list
450482

@@ -641,22 +673,24 @@ def _localize_counts(counts, num_qubits):
641673

642674
def execute_circuits_parallel(circuits, num_shots):
643675
"""
644-
Execute a list of QED-C circuits using the integrated Qiskit
645-
ParallelExperiment path.
676+
Execute a list of QED-C circuits using Qiskit ParallelExperiment with
677+
array batching.
646678
647679
This function is the QED-C entry point for parallel circuit execution.
648-
Instead of calling QED-C's normal sequential execute_circuits() path, it
649-
attempts to run the input circuits together as one parallel experiment.
680+
It distributes the input circuits across available hardware partitions
681+
using round-robin assignment, submits all circuits as a single
682+
ParallelExperiment job, and returns results in the original circuit order.
650683
651684
The flow is:
652685
653686
1. Receive a list of QED-C generated QuantumCircuit objects.
654687
2. Call _run_qiskit_parallel_experiment(), which:
655-
- removes final measurements,
656-
- partitions the hardware into disjoint qubit regions,
657-
- maps each circuit to one region,
658-
- wraps each circuit as a Qiskit Experiment,
659-
- runs them together with ParallelExperiment.
688+
- finds disjoint qubit partitions (topology+error aware)
689+
- distributes circuits round-robin across partitions
690+
- wraps each partition as a _CircuitArrayExperiment
691+
- ParallelExperiment zips arrays into composite circuits
692+
- submits all composites as one job
693+
- extracts and reorders results to original circuit order
660694
3. Convert the returned counts into QED-C's expected per-circuit
661695
bitstring format.
662696
4. Wrap the counts in QED-C's ExecutionResult object.
@@ -685,9 +719,6 @@ def execute_circuits_parallel(circuits, num_shots):
685719
print(f">>> execute_circuits_parallel [qiskit]: {len(circuits)} circuits, {num_shots} shots")
686720

687721
try:
688-
# Run the circuits through the custom multiprogramming +
689-
# Qiskit ParallelExperiment pipeline.
690-
print("Uses the integrated Qiskit ParallelExperiment workflow. If the parallel path fails, execution automatically falls back to the standard QED-C execution path.")
691722
counts_list = _run_qiskit_parallel_experiment(circuits, num_shots)
692723

693724
# Debug: show counts before and after localization

0 commit comments

Comments
 (0)