forked from quantumlib/Cirq
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamical_decoupling.py
More file actions
306 lines (255 loc) · 13.2 KB
/
dynamical_decoupling.py
File metadata and controls
306 lines (255 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# Copyright 2024 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transformer pass that adds dynamical decoupling operations to a circuit."""
from __future__ import annotations
from functools import reduce
from itertools import cycle
from typing import TYPE_CHECKING
import numpy as np
from cirq import ops, protocols
from cirq.circuits import Circuit, FrozenCircuit, Moment
from cirq.protocols import unitary_protocol
from cirq.protocols.has_stabilizer_effect_protocol import has_stabilizer_effect
from cirq.protocols.has_unitary_protocol import has_unitary
from cirq.transformers import transformer_api
from cirq.transformers.analytical_decompositions import single_qubit_decompositions
if TYPE_CHECKING:
import cirq
def _get_dd_sequence_from_schema_name(schema: str) -> tuple[ops.Gate, ...]:
"""Gets dynamical decoupling sequence from a schema name."""
match schema:
case 'DEFAULT':
return (ops.X, ops.Y, ops.X, ops.Y)
case 'XX_PAIR':
return (ops.X, ops.X)
case 'X_XINV':
return (ops.X, ops.X**-1)
case 'YY_PAIR':
return (ops.Y, ops.Y)
case 'Y_YINV':
return (ops.Y, ops.Y**-1)
case _:
raise ValueError('Invalid schema name.')
def _pauli_up_to_global_phase(gate: ops.Gate) -> ops.Pauli | None:
for pauli_gate in [ops.X, ops.Y, ops.Z]:
if protocols.equal_up_to_global_phase(gate, pauli_gate):
return pauli_gate
return None
def _validate_dd_sequence(dd_sequence: tuple[ops.Gate, ...]) -> None:
"""Validates a given dynamical decoupling sequence.
The sequence should only consist of Pauli gates and is essentially an identity gate.
Args:
dd_sequence: Input dynamical sequence to be validated.
Raises:
ValueError: If dd_sequence is not valid.
"""
if len(dd_sequence) < 2:
raise ValueError('Invalid dynamical decoupling sequence. Expect more than one gates.')
for gate in dd_sequence:
if _pauli_up_to_global_phase(gate) is None:
raise ValueError(
'Dynamical decoupling sequence should only contain gates that are essentially'
' Pauli gates.'
)
matrices = [unitary_protocol.unitary(gate) for gate in dd_sequence]
product = reduce(np.matmul, matrices)
if not protocols.equal_up_to_global_phase(product, np.eye(2)):
raise ValueError(
'Invalid dynamical decoupling sequence. Expect sequence product equals'
f' identity up to a global phase, got {product}.'.replace('\n', ' ')
)
def _parse_dd_sequence(
schema: str | tuple[ops.Gate, ...],
) -> tuple[tuple[ops.Gate, ...], dict[ops.Gate, ops.Pauli]]:
"""Parses and returns dynamical decoupling sequence and its associated pauli map from schema."""
dd_sequence = None
if isinstance(schema, str):
dd_sequence = _get_dd_sequence_from_schema_name(schema)
else:
_validate_dd_sequence(schema)
dd_sequence = schema
# Map gate to Pauli gate. This is necessary as dd sequence might contain gates like X^-1.
pauli_map: dict[ops.Gate, ops.Pauli] = {}
for gate in dd_sequence:
pauli_gate = _pauli_up_to_global_phase(gate)
if pauli_gate is not None:
pauli_map[gate] = pauli_gate
for gate in [ops.X, ops.Y, ops.Z]:
pauli_map[gate] = gate
return (dd_sequence, pauli_map)
def _is_single_qubit_operation(operation: ops.Operation) -> bool:
return len(operation.qubits) == 1
def _is_single_qubit_gate_moment(moment: Moment) -> bool:
return all(_is_single_qubit_operation(op) for op in moment)
def _is_clifford_op(op: ops.Operation) -> bool:
if op.gate:
return has_unitary(op.gate) and has_stabilizer_effect(op.gate)
return False
def _calc_busy_moment_range_of_each_qubit(circuit: FrozenCircuit) -> dict[ops.Qid, list[int]]:
busy_moment_range_by_qubit: dict[ops.Qid, list[int]] = {
q: [len(circuit), -1] for q in circuit.all_qubits()
}
for moment_id, moment in enumerate(circuit):
for q in moment.qubits:
busy_moment_range_by_qubit[q][0] = min(busy_moment_range_by_qubit[q][0], moment_id)
busy_moment_range_by_qubit[q][1] = max(busy_moment_range_by_qubit[q][1], moment_id)
return busy_moment_range_by_qubit
def _is_insertable_moment(moment: Moment, single_qubit_gate_moments_only: bool) -> bool:
return not single_qubit_gate_moments_only or _is_single_qubit_gate_moment(moment)
def _merge_single_qubit_ops_to_phxz(
q: ops.Qid, operations: tuple[ops.Operation, ...]
) -> ops.Operation:
"""Merges [op1, op2, ...] and returns an equivalent op"""
if len(operations) == 1:
return operations[0]
matrices = [unitary_protocol.unitary(op) for op in reversed(operations)]
product = reduce(np.matmul, matrices)
gate = single_qubit_decompositions.single_qubit_matrix_to_phxz(product) or ops.I
return gate.on(q)
def _calc_pulled_through(moment: Moment, input_pauli_ops: ops.PauliString) -> ops.PauliString:
"""Calculates the pulled_through such that circuit(input_pauli_ops, moment.clifford_ops) is
equivalent to circuit(moment.clifford_ops, pulled_through).
"""
clifford_ops_in_moment: list[ops.Operation] = [
op for op in moment.operations if _is_clifford_op(op)
]
return input_pauli_ops.after(clifford_ops_in_moment)
def _get_stop_qubits(moment: Moment) -> set[ops.Qid]:
stop_pulling_through_qubits: set[ops.Qid] = set()
for op in moment:
if (not _is_clifford_op(op) and not _is_single_qubit_operation(op)) or not has_unitary(
op
): # multi-qubit clifford op or non-mergable op.
stop_pulling_through_qubits.update(op.qubits)
return stop_pulling_through_qubits
def _need_merge_pulled_through(op_at_q: ops.Operation, is_at_last_busy_moment: bool) -> bool:
"""With a pulling through pauli gate before op_at_q, need to merge with the
pauli in the conditions below."""
# The op must be mergable and single-qubit
if not (_is_single_qubit_operation(op_at_q) and has_unitary(op_at_q)):
return False
# Either non-Clifford or at the last busy moment
return is_at_last_busy_moment or not _is_clifford_op(op_at_q)
@transformer_api.transformer
def add_dynamical_decoupling(
circuit: cirq.AbstractCircuit,
*,
context: cirq.TransformerContext | None = None,
schema: str | tuple[ops.Gate, ...] = 'DEFAULT',
single_qubit_gate_moments_only: bool = True,
) -> cirq.Circuit:
"""Adds dynamical decoupling gate operations to a given circuit.
This transformer might add new moments and thus change the structure of the original circuit.
Args:
circuit: Input circuit to transform.
context: `cirq.TransformerContext` storing common configurable options for transformers.
schema: Dynamical decoupling schema name or a dynamical decoupling sequence.
If a schema is specified, the provided dynamical decoupling sequence will be used.
Otherwise, customized dynamical decoupling sequence will be applied.
single_qubit_gate_moments_only: If set True, dynamical decoupling operation will only be
added in single-qubit gate moments.
Returns:
A copy of the input circuit with dynamical decoupling operations.
"""
base_dd_sequence, pauli_map = _parse_dd_sequence(schema)
orig_circuit = circuit.freeze()
busy_moment_range_by_qubit = _calc_busy_moment_range_of_each_qubit(orig_circuit)
# Stores all the moments of the output circuit chronologically.
transformed_moments: list[Moment] = []
# A PauliString stores the result of 'pulling' Pauli gates past each operations
# right before the current moment.
pulled_through: ops.PauliString = ops.PauliString()
# Iterator of gate to be used in dd sequence for each qubit.
dd_iter_by_qubits = {q: cycle(base_dd_sequence) for q in circuit.all_qubits()}
def _update_pulled_through(q: ops.Qid, insert_gate: ops.Gate) -> ops.Operation:
nonlocal pulled_through, pauli_map
pulled_through *= pauli_map[insert_gate].on(q)
return insert_gate.on(q)
# Insert and pull remaining Pauli ops through the whole circuit.
# General ideas are
# * Pull through Clifford gates.
# * Stop at multi-qubit non-Clifford ops (and other non-mergable ops).
# * Merge to single-qubit non-Clifford ops.
# * Insert a new moment if necessary.
# After pulling through pulled_through at `moment`, we expect a transformation of
# (pulled_through, moment) -> (updated_moment, updated_pulled_through) or
# (pulled_through, moment) -> (new_moment, updated_moment, updated_pulled_through)
# Moments structure changes are split into 3 steps:
# 1, (..., last_moment, pulled_through1, moment, ...)
# -> (..., last_moment, new_moment or None, pulled_through2, moment, ...)
# 2, (..., pulled_through2, moment, ...) -> (..., pulled_through3, updated_moment, ...)
# 3, (..., pulled_through3, updated_moment, ...)
# -> (..., updated_moment, pulled_through4, ...)
for moment_id, moment in enumerate(orig_circuit.moments):
# Step 1, insert new_moment if necessary.
# In detail: stop pulling through for multi-qubit non-Clifford ops or gates without
# unitary representation (e.g., measure gates). If there are remaining pulled through ops,
# insert into a new moment before current moment.
stop_pulling_through_qubits: set[ops.Qid] = _get_stop_qubits(moment)
new_moment_ops: list[ops.Operation] = []
for q in stop_pulling_through_qubits:
# Insert the remaining pulled_through
remaining_pulled_through_gate = pulled_through.get(q)
if remaining_pulled_through_gate is not None:
new_moment_ops.append(_update_pulled_through(q, remaining_pulled_through_gate))
# Reset dd sequence
dd_iter_by_qubits[q] = cycle(base_dd_sequence)
# Need to insert a new moment before current moment
if new_moment_ops:
# Fill insertable idle moments in the new moment using dd sequence
for q in orig_circuit.all_qubits() - stop_pulling_through_qubits:
if busy_moment_range_by_qubit[q][0] < moment_id <= busy_moment_range_by_qubit[q][1]:
new_moment_ops.append(_update_pulled_through(q, next(dd_iter_by_qubits[q])))
transformed_moments.append(Moment(new_moment_ops))
# Step 2, calc updated_moment with insertions / merges.
updated_moment_ops: set[cirq.Operation] = set()
for q in orig_circuit.all_qubits():
op_at_q = moment.operation_at(q)
remaining_pulled_through_gate = pulled_through.get(q)
updated_op = op_at_q
if op_at_q is None: # insert into idle op
if not _is_insertable_moment(moment, single_qubit_gate_moments_only):
continue
if (
busy_moment_range_by_qubit[q][0] < moment_id < busy_moment_range_by_qubit[q][1]
): # insert next pauli gate in the dd sequence
updated_op = _update_pulled_through(q, next(dd_iter_by_qubits[q]))
elif ( # insert the remaining pulled through if beyond the ending busy moment
moment_id > busy_moment_range_by_qubit[q][1]
and remaining_pulled_through_gate is not None
):
updated_op = _update_pulled_through(q, remaining_pulled_through_gate)
elif (
remaining_pulled_through_gate is not None
): # merge pulled-through of q to op_at_q if needed
if _need_merge_pulled_through(
op_at_q, moment_id == busy_moment_range_by_qubit[q][1]
):
remaining_op = _update_pulled_through(q, remaining_pulled_through_gate)
updated_op = _merge_single_qubit_ops_to_phxz(q, (remaining_op, op_at_q))
if updated_op is not None:
updated_moment_ops.add(updated_op)
if updated_moment_ops:
updated_moment = Moment(updated_moment_ops)
transformed_moments.append(updated_moment)
# Step 3, update pulled through.
# In detail: pulling current `pulled_through` through updated_moment.
pulled_through = _calc_pulled_through(updated_moment, pulled_through)
# Insert a new moment if there are remaining pulled-through operations.
ending_moment_ops = []
for affected_q, combined_op_in_pauli in pulled_through.items():
ending_moment_ops.append(combined_op_in_pauli.on(affected_q))
if ending_moment_ops:
transformed_moments.append(Moment(ending_moment_ops))
return Circuit.from_moments(*transformed_moments)