-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.py
367 lines (314 loc) · 15.7 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import numpy as np
from qiskit import Aer, QuantumCircuit
from qiskit.execute_function import execute
import qiskit.quantum_info as qi
from qiskit_machine_learning.neural_networks import SamplerQNN
from qiskit_machine_learning.algorithms import ObjectiveFunction
from qiskit_machine_learning.algorithms import NeuralNetworkRegressor
import scipy
import math
import os
abspath = os.path.abspath('__file__')
dname = os.path.dirname(abspath)
os.chdir(dname)
import embedding
import copy
class QVAE_NN(SamplerQNN):
def __init__(self,encoder: QuantumCircuit,decoder: QuantumCircuit,num_encoder_params:int,trash_qubits,num_auxiliary_encoder,num_auxiliary_decoder,**kwargs):
super(QVAE_NN, self).__init__(**kwargs)
self._encoder = encoder.copy()
self._decoder = decoder.copy()
self._num_encoder_params=num_encoder_params
self._trash_qubits=trash_qubits
self._num_auxiliary_encoder=num_auxiliary_encoder
self._num_auxiliary_decoder=num_auxiliary_decoder
def forward(self,input_data,weights):
"""
Forward pass of the network.
"""
num_encoder_params=self._num_encoder_params
encoder_weights=weights[:num_encoder_params]
decoder_weights=weights[num_encoder_params:]
trash_qubits=self._trash_qubits
num_samples=len(input_data)
### Encoder
n_qubit = math.log2(len(input_data[0]))
assert(int(n_qubit)==n_qubit)
n_qubit=int(n_qubit)
original_encoder=self._encoder.copy('original_e')
my_encoder=original_encoder.assign_parameters(encoder_weights)
'''
### state vector version encoder
#qc_list=[]
result_tmp=[]
for i in range(num_samples):
qc_e=QuantumCircuit(n_qubit+self._num_auxiliary_encoder)
qc_e.initialize(input_data[i], range(n_qubit))
# input + auxiliary? !!! check
qc_e=qc_e.compose(my_encoder)
state_vector = qi.Statevector.from_instruction(qc_e)
result_tmp.append(state_vector)
### partial trace over auxiliary qubits encoder
result=[]
for state in result_tmp:
quantum_state=qi.Statevector(state)
auxiliary_qubits_e=range(n_qubit,n_qubit+self._num_auxiliary_encoder)
tmp_state=qi.partial_trace(quantum_state,auxiliary_qubits_e)
trace=tmp_state.trace()
tmp_state=tmp_state/trace
result.append(tmp_state) # reduced output by tracing out auxiliary qubits
'''
### DensityMatrix version encoder
result_tmp=[]
for i in range(num_samples):
num_aux_en=2**self._num_auxiliary_encoder
aux_state_en=np.zeros((num_aux_en,num_aux_en))
aux_state_en[0][0]=1
auxiliary_qubits_en=qi.DensityMatrix(aux_state_en)
my_state=qi.DensityMatrix(input_data[i])
my_state=my_state.tensor(auxiliary_qubits_en)
my_state=my_state.evolve(my_encoder)
result_tmp.append(my_state)
result=[]
for state in result_tmp:
auxiliary_qubits_e=range(n_qubit,n_qubit+self._num_auxiliary_encoder)
tmp_state=qi.partial_trace(state,auxiliary_qubits_e)
trace=tmp_state.trace()
tmp_state=tmp_state/trace
result.append(tmp_state) # reduced output by tracing out auxiliary qubits
if trash_qubits==None:
reduced_result=result
else:
reduced_result=[]
for quantum_state in result:
reduced_state=qi.partial_trace(quantum_state,trash_qubits)
trace=reduced_state.trace()
reduced_state=reduced_state/trace
reduced_result.append(reduced_state) # reduced output by tracing out trash qubits
### Decoder
num_trash=2**len(trash_qubits)
zero_state=np.zeros((num_trash,num_trash))
zero_state[0][0]=1
reconstruction_qubits=qi.DensityMatrix(zero_state)
num_aux=2**self._num_auxiliary_decoder
aux_state=np.zeros((num_aux,num_aux))
aux_state[0][0]=1
auxiliary_qubits=qi.DensityMatrix(aux_state)
original_decoder=self._decoder.copy('original_d')
qc_d=original_decoder.assign_parameters(decoder_weights,inplace=False)
decoder_output=[]
for item in reduced_result:
latent_state=qi.DensityMatrix(item)
latent_full=latent_state.tensor(reconstruction_qubits)
latent_full=latent_full.tensor(auxiliary_qubits)
### partial trace over auxiliary qubits decoder
quantum_state=latent_full.evolve(qc_d)
auxiliary_qubits_d=range(n_qubit,n_qubit+self._num_auxiliary_decoder)
tmp_state=qi.partial_trace(quantum_state,auxiliary_qubits_d) # reduced output by tracing out auxiliary qubits
trace=tmp_state.trace()
tmp_state=tmp_state/trace
decoder_output.append(tmp_state)
return decoder_output,reduced_result
class QVAE_trainer(NeuralNetworkRegressor):
def __init__(self,beta,regularizer_type,reconstruction_loss='fidelity',**kwargs):
super(QVAE_trainer, self).__init__(**kwargs)
self._reconstruction_loss=reconstruction_loss
self._beta=beta
self._regularizer_type=regularizer_type
def _fit_internal(self, X: np.ndarray, y: np.ndarray):
function: ObjectiveFunction = None
#function = StateVector_ObjectiveFunction(X, y, self._neural_network, self._loss)
function = DensityMatrix_ObjectiveFunction(X=X, y=y, neural_network=self._neural_network,loss=self._loss,reconstruction_loss=self._reconstruction_loss,beta=self._beta,regularizer_type=self._regularizer_type)
return self._minimize(function)
def score_fidelity(self, X, y):
y_pred = self.predict(X)[0]
fidelity_score=0
for i,j in zip(y_pred,y):
fidelity_score =fidelity_score+qi.state_fidelity(i,j,validate=True)
return fidelity_score/len(y)
def score(self, X, y, loss_type):
y_pred = self.predict(X)[0]
if loss_type=='fidelity':
score=0
for i,j in zip(y_pred,y):
score =score+qi.state_fidelity(i,j,validate=True)
return score/len(y)
elif loss_type=='wasserstein':
num_data=len(X)
dim=len(X[0])
n_qubits_latent=int(2*math.log2(dim))
swap_circuit = QuantumCircuit(n_qubits_latent,0)
for i in range(int(n_qubits_latent*0.5)):
swap_circuit.swap(i, int(n_qubits_latent*0.5)+i)
backend = Aer.get_backend('unitary_simulator')
job=execute(swap_circuit,backend,shots=100)
result=job.result()
swap=result.get_unitary(swap_circuit,3).data
identity_matrix=np.identity(dim*dim)
C=0.5*(identity_matrix-swap)
cost=0
pi_state=np.zeros((len(X[0])*len(X[0]),len(X[0])*len(X[0])))
for v,m in zip(y,y_pred):
input_densityMatrix=qi.DensityMatrix(v)
in_out_state=qi.DensityMatrix(m).expand(input_densityMatrix)
pi_state=(pi_state+in_out_state.data*1./num_data)
cost=np.trace(np.matmul(pi_state,C))
return -cost.real
elif loss_type=='cross_entropy':
score=0
for v,m in zip(y,y_pred):
input_state_entropy=qi.entropy(v)
log_state=qi.DensityMatrix(scipy.linalg.logm(m/np.log(2.0)))
relative_entropy=-qi.DensityMatrix(np.dot(v,log_state)).trace()-input_state_entropy
score+=relative_entropy.real
return -score/len(y)
elif loss_type=='JSD':
score=0
for v,m in zip(y,y_pred):
M_state=0.5*(v.data+m.data)
log_M=qi.DensityMatrix(scipy.linalg.logm(M_state)/np.log(2.0)) # in base 2
relative_entropy=(-qi.DensityMatrix(np.dot(v,log_M)).trace()-qi.entropy(v)-qi.DensityMatrix(np.dot(m,log_M)).trace()-qi.entropy(m))*0.5 ##check if correct!
score+=relative_entropy.real
return -score*1./len(y)
else:
raise ValueError('reconstruction loss type not recognized')
class StateVector_ObjectiveFunction(ObjectiveFunction):
def objective(self, weights: np.ndarray) -> float:
# output is of shape (N, num_outputs)
output = self._neural_network_forward(weights)[0]
val =sum(self._loss(output, self._y))
val = val / self._num_samples
return val
def gradient(self, weights: np.ndarray) -> np.ndarray:
# weight probability gradient is of shape (N, num_outputs, num_weights)
_, weight_prob_grad = self._neural_network.backward(self._X, weights)
output = self._neural_network_forward(weights)[0]
grad = np.zeros((1, self._neural_network.num_weights))
num_outputs = self._neural_network.output_shape[0]
for i in range(num_outputs):
grad += weight_prob_grad[:, i, :].T @ self._loss(output[:,i], self._y[:,i])
grad = grad / self._num_samples
return grad
class DensityMatrix_ObjectiveFunction(ObjectiveFunction):
def __init__(self,reconstruction_loss,beta,regularizer_type,**kwargs):
super(DensityMatrix_ObjectiveFunction, self).__init__(**kwargs)
self._reconstruction_loss=reconstruction_loss
self._beta=beta
self._regularizer_type=regularizer_type
#self._global_state_flag=global_state_flag
def reconstruction_loss(self,matrix,vector):
if self._reconstruction_loss=='wasserstein':
num_data=len(vector)
dim=len(vector[0])
n_qubits_latent=int(2*math.log2(dim))
swap_circuit = QuantumCircuit(n_qubits_latent,0)
for i in range(int(n_qubits_latent*0.5)):
swap_circuit.swap(i, int(n_qubits_latent*0.5)+i)
backend = Aer.get_backend('unitary_simulator')
job=execute(swap_circuit,backend,shots=100)
result=job.result()
swap=result.get_unitary(swap_circuit,3).data
identity_matrix=np.identity(dim*dim)
C=0.5*(identity_matrix-swap)
cost=0
pi_state=np.zeros((len(vector[0])*len(vector[0]),len(vector[0])*len(vector[0])))
for v,m in zip(vector,matrix):
input_densityMatrix=qi.DensityMatrix(v)
in_out_state=qi.DensityMatrix(m).expand(input_densityMatrix)
pi_state=(pi_state+in_out_state.data*1./num_data)
cost=np.trace(np.matmul(pi_state,C))
return cost.real
else:
if self._reconstruction_loss=='cross_entropy':
sum=0
for v,m in zip(vector,matrix):
input_state_entropy=qi.entropy(v)
log_state=qi.DensityMatrix(scipy.linalg.logm(m)/np.log(2.0))
relative_entropy=-qi.DensityMatrix(np.dot(v,log_state)).trace()-input_state_entropy
sum+=relative_entropy.real
return sum*1./len(vector)
elif self._reconstruction_loss=='fidelity':
sum=0
for v,m in zip(vector,matrix):
current_fidelity=qi.state_fidelity(m,v,validate=True)
sum+=current_fidelity
return -sum*1./len(vector)
elif self._reconstruction_loss=='JSD':
sum=0
for v,m in zip(vector,matrix):
M_state=0.5*(v.data+m.data)
log_M=scipy.linalg.logm(M_state)/np.log(2.0) # in base 2
relative_entropy=(-qi.DensityMatrix(np.dot(v,log_M)).trace()-qi.entropy(v)-qi.DensityMatrix(np.dot(m,log_M)).trace()-qi.entropy(m))*0.5 ##check if correct!
sum+=relative_entropy.real
return sum*1./len(vector)
else:
raise ValueError('reconstruction loss type not recognized')
def quantum_relative_entropy(self,latent):
type_regularizer=self._regularizer_type
dim=latent[0].dim
max_mixed_state=np.diag(np.full(dim,1/dim))
## analogy KL divergence
if type_regularizer=='KLD':
entropy_loss=0
for state in latent:
max_entropy=qi.entropy(max_mixed_state) ## this is log base 2 by default
log_state=scipy.linalg.logm(state)/np.log(2.0) # in base 2
relative_entropy=-qi.DensityMatrix(np.dot(max_mixed_state,log_state)).trace()-max_entropy #KL(a||b), a is actual, b is predict
entropy_loss=entropy_loss+relative_entropy.real
return entropy_loss*1./len(latent)
## analogy Jensen-Shannon divergence
elif type_regularizer=='JSD':
entropy_loss=0
for state in latent:
M_state=0.5*(state+max_mixed_state)
log_M=scipy.linalg.logm(M_state)/np.log(2.0) # in base 2
my_entropy=qi.entropy(state)
max_entryopy=qi.entropy(max_mixed_state)
#JSD(a||b)=0.5*KL(a||M)+0.5*KL(b||M), M=0.5a+0.5b
relative_entropy=(-qi.DensityMatrix(np.dot(state,log_M)).trace()-my_entropy-qi.DensityMatrix(np.dot(max_mixed_state,log_M)).trace()-max_entryopy)*0.5 ##check if correct!
entropy_loss=entropy_loss+relative_entropy.real
return entropy_loss*1./len(latent)
elif type_regularizer=='fidelity':
entropy_loss=0
for state in latent:
entropy_loss=entropy_loss+qi.state_fidelity(state,max_mixed_state,validate=True)
return -entropy_loss*1./len(latent)
elif type_regularizer=='wasserstein': #this definition of \pi is bad for two mixed states.
dim=latent[0].dim
max_mixed_state=np.diag(np.full(dim,1/dim))
n_qubits_latent=int(2*math.log2(dim))
swap_circuit = QuantumCircuit(n_qubits_latent,0)
for i in range(int(n_qubits_latent*0.5)):
swap_circuit.swap(i, int(n_qubits_latent*0.5)+i)
backend = Aer.get_backend('unitary_simulator')
job=execute(swap_circuit,backend,shots=100)
result=job.result()
swap=result.get_unitary(swap_circuit,3).data
identity_matrix=np.identity(dim*dim)
C=0.5*(identity_matrix-swap)
cost=0
for state in latent:
pi_state=(qi.DensityMatrix(max_mixed_state).expand(state)).data
cost=cost+np.trace(np.matmul(pi_state,C))/len(latent)
return cost.real
else:
raise ValueError('divergence type not recognized')
def objective(self, weights: np.ndarray) -> float:
# output is of shape (N, num_outputs)
forward_result=self._neural_network_forward(weights)
output = forward_result[0]
latent=forward_result[1]
val_1 =self.reconstruction_loss(matrix=output, vector=self._y)
val_2 =self.quantum_relative_entropy(latent=latent)
val = val_1+self._beta*val_2 #minimize, normalization is in val_1 and val_2
return val
def gradient(self, weights: np.ndarray) -> np.ndarray:
raise NotImplementedError
def encoder(n_layers,n_qubit,theta_params,num_auxiliary_encoder):
qc=QuantumCircuit(n_qubit+num_auxiliary_encoder)
embedding.ising_interaction_noInput(qc,theta_params,n_layers,n_qubit,num_auxiliary_encoder)
return qc
def decoder(n_layers,n_qubit,theta_params,num_auxiliary_decoder):
qc=QuantumCircuit(n_qubit+num_auxiliary_decoder)
embedding.ising_interaction_noInput(qc,theta_params,n_layers,n_qubit,num_auxiliary_decoder)
return qc