Author: Linsen Li, Pratyush Anand
For the given qubit pre-characterization information, we can predict the best performance (measured by quantum volume) such a qubit system can reach.
It contains two parts, the RL agent 1 which uses the DNN network for optimizing the physical control parameters for the best fidelity control; And the RL agent 2 which uses the transformer architecture to output the action matrix for dynamic scheduling strategy optimization. With more training data, the simulation environment in RL agent 1 can be learned by a neural network so that these two RL agents can merge together to become a big end-to-end neural network instead of two separate neural networks.
import numpy as np # mathmatic computing
import matplotlib.pyplot as plt # plot the result
import random # random variable generation
import time # time counting
from qutip import * # quantum simulationfrom stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.env_util import make_vec_envimport gym
from gym import spacesimport multiprocessing
from multiprocessing import Process, QueueFour-level atomic system. In color shows the operators, blue, red, and orange representing the spin-conserving, spin- flipping, and MW transitions respectively.
cost_history = []
Na = 4 # 4 atomic levels
ground_down = basis(Na, 0)
ground_up = basis(Na, 1)
excited_up = basis(Na, 2)
excited_down = basis(Na, 3)
N = 2 # Set where to truncate Fock state of cavity
sigma_A_gd_ed = tensor(qeye(N), ground_down * excited_down.dag(), qeye(N), qeye(Na)) # |g_down><e_down| of system A
sigma_B_gd_ed = tensor(qeye(N), qeye(Na), qeye(N), ground_down * excited_down.dag()) # |g_down><e_down| of system B
sigma_A_gu_eu = tensor(qeye(N), ground_up * excited_up.dag(), qeye(N), qeye(Na)) # |g_up><e_up| of system A
sigma_B_gu_eu = tensor(qeye(N), qeye(Na), qeye(N), ground_up * excited_up.dag()) # |g_up><e_up| of system B
sigma_A_gd_eu = tensor(qeye(N), ground_down * excited_up.dag(), qeye(N), qeye(Na)) # |g_down><e_up| of system A
sigma_B_gd_eu = tensor(qeye(N), qeye(Na), qeye(N), ground_down * excited_up.dag()) # |g_down><e_up| of system B
sigma_A_gu_ed = tensor(qeye(N), ground_up * excited_down.dag(), qeye(N), qeye(Na)) # |g_up><e_down| of system A
sigma_B_gu_ed = tensor(qeye(N), qeye(Na), qeye(N), ground_up * excited_down.dag()) # |g_up><e_down| of system B
sigma_A_gd_gu = tensor(qeye(N), ground_down * ground_up.dag(), qeye(N), qeye(Na)) # |g_down><g_up| of system A
sigma_B_gd_gu = tensor(qeye(N), qeye(Na), qeye(N), ground_down * ground_up.dag()) # |g_down><g_up| of system B
a_A = tensor(destroy(N), qeye(Na), qeye(N), qeye(Na)) # annihiliation occurs of system A
a_B = tensor(qeye(N), qeye(Na), destroy(N), qeye(Na)) # annihiliation occurs of system BThe cost function include the control parameters x1 and x2 for two qubit, physical system parameters e1 and e2, and result_list as the output.
Quantum Monte Carlo Simulation visualization for the Barrett-Kok Protocol
def my_cost_function(x1, x2, e1, e2, result_list):
### environment physical parameters
gamma1 = e1[0] # Spontaneous decay rate
K_dep1 = e1[1] # Dephasing rate
K_c1 = e1[2] # Out-coupling rate
cyc_1 = e1[3] # Cyclicity of the not spin conserved transition
kappa1 = e1[4] # Cavity coupling rate
eta1 = e1[5] # Photon detection efficiency
f1 = e1[6] # Random spectral diffusion deviation estimated with HMM model
gamma2 = e2[0] # Spontaneous decay rate
K_dep2 = e2[1] # Dephasing rate
K_c2 = e2[2] # Out-coupling rate
cyc_2 = e2[3] # Cyclicity of the not spin conserved transition
kappa2 = e2[4] # Cavity coupling rate
eta2 = e2[5] # Photon detection efficiency
f2 = e2[6] # Random spectral diffusion deviation estimated with HMM model
### control parameters
g1 = x1[0] # Cavity-atom coupling
w_cav_1 = x1[1] # Cavity - atom detuning
life_time1 = x1[2] # First laser control gaussian width
sig1 = x1[3] # Second laser control gaussian width
g2 = x2[0] # Cavity-atom coupling
w_cav_2 = x2[1] # Cavity - atom detuning
life_time2 = x2[2] # First laser control gaussian width
sig2 = x2[3] # Second laser control gaussian width
flag = 0 # Flag result confirming we receive photon counting within the QMCS event
while flag == 0: # If not received the counting, flag = 0 will restart the computation
c_ops = [] # Build collapse operators
QE = 0.8 # Quantum efficiency (using SnV- as the example around 80%)
DW = 0.4 # Debye-waller factor (using SnV- as the example around 40%)
Fp1 = (1 + 8 * g1 ** 2 / (K_c1 * gamma1)) / (np.sqrt(1 + (w_cav_1 / K_c1) ** 2)) # Purcell factor calculation Fp = 1 + 2 * C, C = 4 * g ** 2 / kappa * gamma
Fp2 = (1 + 8 * g2 ** 2 / (K_c2 * gamma2)) / (np.sqrt(1 + (w_cav_2 / K_c2) ** 2)) # Purcell factor calculation Fp = 1 + 2 * C, C = 4 * g ** 2 / kappa * gamma
ZPL1 = eta1 * QE * (DW * Fp1) / (1 + DW * (Fp1 - 1)) # ZPL collection efficiency for detector 1
ZPL2 = eta2 * QE * (DW * Fp2) / (1 + DW * (Fp2 - 1)) # ZPL collection efficiency for detector 2
c_ops.append(np.sqrt(kappa1) * a_A) # Cavity decay of system A. C0
c_ops.append(np.sqrt(kappa2) * a_B) # Cavity decay of system B. C1
c_ops.append(np.sqrt(gamma1) * sigma_A_gd_ed) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system A. C2
c_ops.append(np.sqrt(gamma2) * sigma_B_gd_ed) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system B. C3
c_ops.append(np.sqrt(gamma1) * sigma_A_gu_eu) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system A. C4
c_ops.append(np.sqrt(gamma2) * sigma_B_gu_eu) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system B. C5
c_ops.append(np.sqrt(gamma1 / cyc_1) * sigma_A_gu_ed) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system A. C6
c_ops.append(np.sqrt(gamma2 / cyc_2) * sigma_B_gu_ed) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system B. C7
c_ops.append(np.sqrt(gamma1 / cyc_1) * sigma_A_gd_eu) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system A. C8
c_ops.append(np.sqrt(gamma2 / cyc_2) * sigma_B_gd_eu) # Spontaneous decay (decaying to modes other than cavity) from |e> to |g_down> of system B. C9
c_ops.append(np.sqrt(K_c1) * (a_A + a_B) / np.sqrt(2)) # Collapsing to detector 1 C10 (Real photon detection)
c_ops.append(np.sqrt(K_c2) * (a_A - a_B) / np.sqrt(2)) # Collapsing to detector 2 C11 (Real photon detection)
c_ops.append(np.sqrt(K_dep1) * (sigma_A_gd_ed.dag() * sigma_A_gd_ed - sigma_A_gd_ed * sigma_A_gd_ed.dag())) # Collapsing to detector 1 C12 (Dephasing modeling)
c_ops.append(np.sqrt(K_dep2) * (sigma_B_gd_ed.dag() * sigma_B_gd_ed - sigma_B_gd_ed * sigma_B_gd_ed.dag())) # Collapsing to detector 2 C13 (Dephasing modeling)
time_scale = 200 # Time step number in the program
t = np.linspace(0.0, 4.0, time_scale) # Define time vector
df_s = max(f1, f2) # Maximum averaged equivalance spectral diffusion between two qubits
w_a = -df_s / 2 # Equivalance spectral detuning for system A
w_b = df_s / 2 # Equivalance spectral detuning for system B
# Step 1
theta = np.pi / 4 # Initial phase
photon = basis(N, 0) # Initial photonic state
spin = ground_down * np.cos(theta) + ground_up * np.sin(theta) # Initial spin state
psi0 = tensor(photon, spin, photon, spin) # Initial global state
H0_A = w_cav_1 * a_A.dag() * a_A - g1 * (sigma_A_gd_ed.dag() * a_A+a_A.dag() * sigma_A_gd_ed) + w_a * (sigma_A_gd_ed.dag() * sigma_A_gd_ed - sigma_A_gd_ed * sigma_A_gd_ed.dag()) # Time-independent Hamiltonian of system A
H0_B = w_cav_2 * a_B.dag() * a_B - g2 * (sigma_B_gd_ed.dag() * a_B+a_B.dag() * sigma_B_gd_ed) + w_b * (sigma_B_gd_ed.dag() * sigma_B_gd_ed - sigma_B_gd_ed * sigma_B_gd_ed.dag()) # Time-independent Hamiltonian of system B
H0 = H0_A + H0_B # Time-independent Hermitian of global system
# Here describes the excitation Hamiltonian
H1_A = (sigma_A_gd_ed.dag() + sigma_A_gd_ed) # Time-dependent Hamiltonian of system A after semi-classical approximation
H1_B = (sigma_B_gd_ed.dag() + sigma_B_gd_ed) # Time-dependent Hamiltonian of system A after semi-classical approximation
numb = 50 # Numbers of trajectories
# Excitation pulse parameters
center = 0.5 # The first excitation pulse center
peak1 = np.sqrt(np.pi) / 2 / life_time1 # Gaussian normalized peak
peak2 = np.sqrt(np.pi) / 2 / life_time2 # Gaussian normalized peak
excite_pulse1 = peak1 * np.exp( - ((t - center) / life_time1) ** 2) # Excitation pulse 1 for H1_A
excite_pulse2 = peak2 * np.exp( - ((t - center) / life_time2) ** 2) # Excitation pulse 2 for H1_B
#Step 2
H = [H0, [H1_A, excite_pulse1], [H1_B, excite_pulse2]] # H changed with the corresponding Hamiltonian
output = mcsolve(H, psi0, t, c_ops, [], ntraj = numb, progress_bar=False) # Output from QMCS
# Here describes the excitation Hamiltonian of the microwave pi pulse after first trial
center2 = 2 # The second excitation pulse center
excite_pulse3 = np.pi / 2 * (np.exp( - (t - center2) ** 2 / sig1 ** 2 / 2)/ sig1 / np.sqrt(2 * np.pi)) # Excitation pulse 3 for H2_A
excite_pulse4 = np.pi / 2 * (np.exp( - (t - center2) ** 2 / sig2 ** 2 / 2)/ sig2 / np.sqrt(2 * np.pi)) # Excitation pulse 4 for H2_B
H2_A = (sigma_A_gd_gu.dag() + sigma_A_gd_gu) # Time-dependent Hamiltonian of system A after semi-classical approximation
H2_B = (sigma_B_gd_gu.dag() + sigma_B_gd_gu) # Time-dependent Hamiltonian of system A after semi-classical approximation
H_second = [[H2_A, excite_pulse3], [H2_B, excite_pulse4]] # H_second changed with the corresponding Hamiltonian
def run_second_mcsolve(final_state, first_run_detector, rhos, counts):
#Step 4
# Let's apply the pi pulse to flip the two spins, which is equivalent to apply a deterministic X gate
result0 = mesolve(H_second, final_state, t) # ME solver provide the final state result
psi0 = result0.states[-1] # Get the result state wave function
#Step 5
result = mcsolve(H, psi0, t, c_ops, [], ntraj = numb, progress_bar=False) # Output from QMCS
for i in range(numb):
#For all trajectories, check if either detector fired, and if so add the final state to the appropriate index in rhos
if 10 in result.col_which[i]:
if random.random() < ZPL1: # Detector efficiency post-selection
rho_num = 0 if first_run_detector == 10 else 2
rhos[rho_num] += result.states[i][-1].ptrace([1, 3])
counts[rho_num] += 1
if 11 in result.col_which[i]:
if random.random() < ZPL2: # Detector efficiency post-selection
rho_num = 1 if first_run_detector == 10 else 3
rhos[rho_num] += result.states[i][-1].ptrace([1, 3])
counts[rho_num] += 1
#We store the cumulative final states for each combination of detector events in the order [D1->D1, D1->D2, D2->D1, D2->D2]
#We will then divide each of these cumulative states by counts to find the average final state for each combination
rhos = [0*output.states[0][0].ptrace([1,3]) for _ in range(4)]
counts = [0, 0, 0, 0]
for i in range(numb):
#Step 3
if 10 in output.col_which[i]:
if random.random() < ZPL1: # Detector efficiency post-selection
run_second_mcsolve(output.states[i][-1], 10, rhos, counts)
if 11 in output.col_which[i]:
if random.random() < ZPL2: # Detector efficiency post-selection
run_second_mcsolve(output.states[i][-1], 11, rhos, counts)
if all(k > 0 for k in counts): # If all the counts has value in the QMCS simulation continue to process (otherwise we will have 0 value in the denominator)
flag = 1 # Stop the while loop and calculate the result
rhos = [rhos[i] / counts[i] for i in range(4)] # Density matrix result
succ_prob = [counts[i] / numb ** 2 for i in range(4)] # Success probablity result
Bell1 = tensor(ground_down, ground_up) / np.sqrt(2) + tensor(ground_up, ground_down) / np.sqrt(2) # Bell state from the output
Bell2 = tensor(ground_down, ground_up) / np.sqrt(2) - tensor(ground_up, ground_down) / np.sqrt(2) # Bell state from the output
e1 = 1 - fidelity(rhos[0], Bell1) * np.exp(-1 / (2e4 * succ_prob[0]))
e2 = 1 - fidelity(rhos[1], Bell1) * np.exp(-1 / (2e4 * succ_prob[1]))
e3 = 1 - fidelity(rhos[2], Bell1) * np.exp(-1 / (2e4 * succ_prob[2]))
e4 = 1 - fidelity(rhos[3], Bell1) * np.exp(-1 / (2e4 * succ_prob[3]))
err = min(e1,e2,e3,e4)
cost_history.append(err) # Append the error in the cos_history
result_list.put([err]) # output the error in the result_list (for parallel computing in different cpu)
return errIf we want to change the optimization value, just change the scaling factor in the reward function, or change the initial value in the RL environment.
Given 4 random qubit, where qubit 1 and 2 are entangled, qubit 3 and 4 are entangled, we would like to find the best next step to fuse these two entanglement with the corresponding control parameters
def reward(x0):
results = [] # Result for store
processes = [] # Parallel computing worker
result_list = Queue() # Parallel computing result sync
# We initialized all the value to 10 in the RL environment, so we need a corresponding scaling factor to represent the real value to put in simulator
g1 = x0[0]
w_cav_1 = x0[1] * 0
life_time1 = x0[2] * 4e-3
sig1 = x0[3] * 4e-2
g2 = x0[4]
w_cav_2 = x0[5] * 0
life_time2 = x0[6] * 4e-3
sig2 = x0[7] * 4e-2
g3 = x0[8]
w_cav_3 = x0[9] * 0
life_time3 = x0[10] * 4e-3
sig3 = x0[11] * 4e-2
g4 = x0[12]
w_cav_4 = x0[13] * 0
life_time4 = x0[14] * 4e-3
sig4 = x0[15] * 4e-2
x1 = [g1, w_cav_1, life_time1, sig1] # Control parameter list xc1
x2 = [g2, w_cav_2, life_time2, sig2] # Control parameter list xc2
x3 = [g3, w_cav_3, life_time3, sig3] # Control parameter list xc3
x4 = [g4, w_cav_4, life_time4, sig4] # Control parameter list xc4
f1 = x0[16] * 0.1
f2 = x0[17] * 0.1
f3 = x0[18] * 0.1
f4 = x0[19] * 0.1
gamma1 = x0[20] * 0.1
K_dep1 = x0[21] * 0.001
K_c1 = x0[22]
cyc_1 = x0[23] * 1000
kappa1 = x0[24] * 0.01
eta1 = x0[25] * 0.1
gamma2 = x0[26] * 0.1
K_dep2 = x0[27] * 0.001
K_c2 = x0[28]
cyc_2 = x0[29] * 1000
kappa2 = x0[30] * 0.01
eta2 = x0[31] * 0.1
gamma3 = x0[32] * 0.1
K_dep3 = x0[33] * 0.001
K_c3 = x0[34]
cyc_3 = x0[35] * 1000
kappa3 = x0[36] * 0.01
eta3 = x0[37] * 0.1
gamma4 = x0[38] * 0.1
K_dep4 = x0[39] * 0.001
K_c4 = x0[40]
cyc_4 = x0[41] * 1000
kappa4 = x0[42] * 0.01
eta4 = x0[43] * 0.1
e1 = [gamma1, K_dep1, K_c1,cyc_1, kappa1, eta1, f1] # Physical parameter list xp1, xd1
e2 = [gamma2, K_dep2, K_c2,cyc_2, kappa2, eta2, f2] # Physical parameter list xp2, xd2
e3 = [gamma3, K_dep3, K_c3,cyc_3, kappa3, eta3, f3] # Physical parameter list xp3, xd3
e4 = [gamma4, K_dep4, K_c4,cyc_4, kappa4, eta4, f4] # Physical parameter list xp4, xd4
processes.append(Process(target=my_cost_function, args=(x1, x3, e1, e3, result_list))) # Parallel computing worker 1 (M2 chip has 4 high performance cpu cores running at 3.5 GHz)
processes.append(Process(target=my_cost_function, args=(x1, x4, e1, e4, result_list))) # Parallel computing worker 2 (M2 chip has 4 high performance cpu cores running at 3.5 GHz)
processes.append(Process(target=my_cost_function, args=(x2, x3, e2, e3, result_list))) # Parallel computing worker 3 (M2 chip has 4 high performance cpu cores running at 3.5 GHz)
processes.append(Process(target=my_cost_function, args=(x2, x4, e2, e4, result_list))) # Parallel computing worker 4 (M2 chip has 4 high performance cpu cores running at 3.5 GHz)
for p in processes: # Start the cost function calculation among 4 different choices
p.start()
result = result_list.get() # Get result from worker 1
results.append(result)
result = result_list.get() # Get result from worker 2
results.append(result)
result = result_list.get() # Get result from worker 3
results.append(result)
result = result_list.get() # Get result from worker 4
results.append(result)
rw = 1 / np.square(results[0][0]) + 1 / np.square(results[1][0]) + 1 / np.square(results[2][0]) + 1 / np.square(results[3][0]) # Using the reward function as sum(1 / (err ** 2)) to provide gradient
cost_history.append(min(results[:][0])) # Attached the minimum err within the four choice in one time.
return -rwThe optimization flow for RL.
train_step = 840 # Maximum training step for this training (changable)
class StopTrainingCallback(BaseCallback):
"""
A custom callback that stops the training after a certain number of calls.
:param stop_after_calls: (int) The number of calls to `_on_step` after which training will be stopped.
"""
def __init__(self, stop_after_calls: int = train_step, verbose=0):
super(StopTrainingCallback, self).__init__(verbose)
self.stop_after_calls = stop_after_calls # Initialize the stop signal
self.call_count = 0 # Initialize the counter
def _on_step(self) -> bool:
"""
This method will be called by Stable Baselines3 at each training step.
"""
# Increment the call count
self.call_count += 1
print(self.call_count) # Print out how many calls has been made.
# Check if the call count has reached the limit
if self.call_count >= self.stop_after_calls:
if self.verbose > 0:
print(f"Stopping training after {self.stop_after_calls} calls to _on_step.")
return False # Signal to stop training
# Continue training if the limit has not been reached
return True
class CostFunctionEnv(gym.Env):
"""Custom Environment to optimize a cost function using RL"""
def __init__(self, cost_function, num_parameters=44, initial_value=10, bounds=(1, 20), max_steps=10):
super(CostFunctionEnv, self).__init__()
self.cost_function = cost_function
self.initial_state = np.array([10] * 44, dtype=np.float32) # All parameters initialized to 10
self.initial_state[16] = np.random.uniform(0, 20) # The f1 parameters provide random generated detuning value (which simulating the experimental case)
self.initial_state[17] = np.random.uniform(0, 20) # The f2 parameters provide random generated detuning value (which simulating the experimental case)
self.initial_state[18] = np.random.uniform(0, 20) # The f3 parameters provide random generated detuning value (which simulating the experimental case)
self.initial_state[19] = np.random.uniform(0, 20) # The f4 parameters provide random generated detuning value (which simulating the experimental case)
self.state = self.initial_state.copy()
self.current_step = 0 # Step counter
self.max_steps = max_steps # Each optimization will just utilize such number of steps (defined in the initial value)
self.action_space = spaces.Box(low=-1, high=1, shape=(16,), dtype=np.float32) # Action space for the first 16 parameters control
# Define observation space with bounds for the first 16 parameters set to 1-20, 17-20 set to 0-20, and the rest fixed at 10
self.observation_space = spaces.Box(low=np.array([1]*16 + [0]*4 + [10]*24),
high=np.array([20]*20 + [10]*24),
dtype=np.float32)
def step(self, action):
# Update parameters based on action
self.state[:16] += action
self.current_step += 1 # Increment step counter
# Calculate cost
cost = self.cost_function(self.state)
# Reward is negative cost for minimization
reward = -cost
# Assume each episode is a single step
done = self.current_step >= self.max_steps
info = {}
return self.state, reward, done, info
def reset(self):
# Reset state to initial condition (which will have some random initialization value for optimization after finishing max_steps optimization)
self.state = np.array(self.initial_state, dtype=np.float32)
self.current_step = 0
return self.state
env = CostFunctionEnv(cost_function = reward) # RL environment building with the defined reward
stop_training_callback = StopTrainingCallback(stop_after_calls = train_step, verbose = 1) # Callback definition
model = PPO.load("/Users/linsenli/Documents/fusion240", env = env) # Reload the model previous trained and continue the trainingOr we can define a new model and restart the brand new training
policy_kwargs = dict(
net_arch = dict(pi = [64, 64], vf = [64, 64])
) # MLP layer network architecture for policy network and value network
model = PPO("MlpPolicy",# MLP network parameters
env,
verbose = 1,
policy_kwargs = policy_kwargs,
learning_rate = 2e-4,
n_steps = 120,
batch_size = 20,
n_epochs = 10,
gamma = 0.99,
gae_lambda = 0.95,
clip_range = 0.2,
clip_range_vf = None,
ent_coef = 0.0,
vf_coef = 0.5,
max_grad_norm = 0.5,
target_kl = None,
tensorboard_log = None,
seed = None,
device = "auto",
_init_setup_model = True)The optimization result comparing RL and traditional gradient decent method (BFGS). The RL converged better with more pre-training.
if __name__ == "__main__":
start_time = time.perf_counter() # Training time counting start
model.learn(total_timesteps = train_step, callback = stop_training_callback) # Model training
end_time = time.perf_counter() # Training time end
elapsed_time = end_time - start_time # Get the training time requirement
print('training finish with time', elapsed_time)
RLeval = [] # Evaluation matrix
for i in range(25): # We use 25 curves for averaging the result for the convergency
print('eval',i)
list_rewards = []
obs_list = []
cost_history = []
# for episode in range(10):
obs = env.reset()
done = False
total_reward = 0
while not done:
action, _ = model.predict(obs, deterministic=True) # Use deterministic actions for evaluation
obs, reward, done, info = env.step(action)
list_rewards.append(-reward)
obs_list.append(obs)
plt.plot(cost_history) # Each cost_history is the optimization convergency curve driven by RL agent
RLeval.append(cost_history) # Store the convergency curve in the RLeval variable
# np.savetxt('fusion4_120eval.txt', RLeval, delimiter=',') # Save the evaluation curve results
# model.save("/Users/linsenli/Documents/fusion120") # Save the model for further trainingimport numpy as np # numpy lib
import torch # Pytorch
import torch.nn as nn # Pytorch neural network
from torch.nn import TransformerEncoder, TransformerEncoderLayer # Pytorch transformer lib
import torch.optim as optim # Pytorch optimizer lib
X= np.loadtxt('Xharder.txt', delimiter=',') # Pre-info of fidelity matrix
R= np.loadtxt('Rharder.txt', delimiter=',') # Pre-info of success rate matrix
R[R == 0] = 1 # Success rate matrix moditication (avoid diagonal line 0 for error in 1./R)
X = X.astype(np.float32) # X to float 32
R = R.astype(np.float32) # R to float 32
E0 = 1 - X * np.exp(-1./ R) # Greedy error to pytorch tensor
Et = torch.from_numpy(E0).float() # Err to pytorch tensor
Xt = torch.from_numpy(X).float() # X to pytorch tensor
Rt = torch.from_numpy(R).float() # R to pytorch tensor
MR = torch.from_numpy(np.exp(-1. / R)) # exp(-1./R) to pytorch tensor
Nqubit = 40 # Qubit number
Npath = 20 # Entanglement worker number that can work in parallels
rat = 2e5 # Entanglement trial rate
G0 = torch.zeros((Nqubit, Nqubit), dtype=torch.int32) # Connected qubit graph adjacent matrix
Gc = torch.zeros((Nqubit, Nqubit), dtype=torch.int32) # Entanglement trial graph adjacent matrix
P1 = torch.arange(1, 41).repeat(40, 1)/40 # Position embedding for qubit 1
column_values = torch.arange(1, 41) / 40 # Transposition of position embedding 1
P2 = column_values.view(-1, 1).repeat(1, 40) # Position embedding for qubit 2
MX_tensor = Xt.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
MR_tensor = MR.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
MG0_tensor = G0.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
MGc_tensor = G0.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
ME_tensor = Et.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
P1_tensor = P1.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
P2_tensor = P2.view(1, 1600, 1) # 40*40 tensor reshape to 1*1600*1 tensor
#transformer input tensor dimension [input dimension, sequence length, batch size]
input_data = torch.cat((MX_tensor, MR_tensor, MG0_tensor, MGc_tensor, ME_tensor, P1_tensor, P2_tensor), dim=2) # 7 1*1600*1 tensor merged to 7*1600*1 as inputclass UnionFind: # Data structure to store the connected qubit graph efficiently
def __init__(self, size): # qubit graph initialization
self.parent = torch.arange(size, dtype=torch.long) # All the qubit node has a parent node of itself
self.size = torch.ones(size, dtype=torch.long) # All the cluster size to 1
def find(self, x): # find the parent root node of a specific qubit
if self.parent[x] != x: # only the parent root node fullfills the self.parent[x] = x
self.parent[x] = self.find(self.parent[x].item()) # iterative find the parent
return self.parent[x] # return the root node of a specific qubit
def union(self, x, y): # union two qubit cluster set if they are connected
rootX = self.find(x) # parent root node of the first qubit
rootY = self.find(y) # parent root node of the second qubit
if rootX != rootY: # when the root node are different, the smaller set attached to the larger set
if self.size[rootX] < self.size[rootY]:
self.parent[rootX] = rootY
self.size[rootY] += self.size[rootX] # The larger set has a larger size
else:
self.parent[rootY] = rootX
self.size[rootX] += self.size[rootY] # The larger set has a larger size
def findLargestSetSize(self): # Find the largest set size in the data structure
return torch.max(self.size)
def isConnected(self, x, y): # Check whether x and y are in the same set (by checking whether their root parent nodes are the same)
return self.find(x) == self.find(y)class CustomTransformerModel(nn.Module): # Transformer architecture
def __init__(self, input_features=7, embed_dim=32, output_features=1, seq_len=1600, nhead=1, num_encoder_layers=1): # Transformer parameters
super(CustomTransformerModel, self).__init__() # Transformer initialization
self.seq_len = seq_len # Transformer sequence length
self.linear_transform = nn.Linear(input_features, embed_dim) # Embedding layer (input_features to embed_dim NN)
encoder_layer = TransformerEncoderLayer(d_model=embed_dim, nhead=nhead, dim_feedforward=64) # Transformer encoding layer
self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers) # Transformer multilayer stacking
self.output_layer = nn.Linear(embed_dim, output_features) #output layer from embed_dim to output_features
def forward(self, x): # Forward propagating
x = self.linear_transform(x) # Embedding layer
x = x.permute(1, 0, 2) # Change to [sequence length, input dimension, batch size]
x = self.transformer_encoder(x) # Transformer layer
x = x.permute(1, 0, 2) # Change back to [input dimension, sequence length, batch size]
x = self.output_layer(x) # Output layer
return x # Get the neural network output result
model = CustomTransformerModel() # Define the model
optimizer = optim.Adam(model.parameters(), lr=1e-2) # Set the optimizer type and learning ratedef findQVmax(delta): # Environment simulation
G0 = torch.zeros((Nqubit, Nqubit), dtype=torch.int32) # Initialized connected qubit graph adjacent matrix
Gc = torch.zeros((Nqubit, Nqubit), dtype=torch.int32) # Initialized entanglement trial graph adjacent matrix
delete = torch.tensor([], dtype=torch.int32) # Entanglement worker list
aval = torch.tensor([], dtype=torch.int32) # Available qubit list
r_ent = torch.zeros(Npath, dtype=Rt.dtype) # Entanglement worker success rate initialization
maxSize = 0 # Maximum cluster point set number
count = 0 # Environment simulation step
flag = False # State change flag
flag2 = False # Available for next scheduling flag
uf = UnionFind(Nqubit) # Initialize unionfind structure for connected cluster storage
TResult = torch.empty(0, 4, dtype=torch.int32) # Result storage table, update once qubit 1 and qubit 2 have sucessful entanglement [simulation frame, qubit 1, qubit 2, maximum cluster set]
with torch.no_grad(): # Don't compute the gradient graph (save the memory)
output = model(input_data) # output of action matrix from input embedding
min_val = output.min() # min output
max_val = output.max() # max output
norm_output = (output - min_val)/(max_val - min_val) # normalized the output from 0 to 1
Ent0 = norm_output.view(40,40)*0.1 # restrict the max amplitude to 0.1
Ent = Ent0+delta*0.1 # Random perturbation to the output
Ent[range(len(Ent)), range(len(Ent))] = 0 # Reset the diagonal items to all 0
MA = Ent + Et # Action matrix: greedy result + NN output
new_err = MA.clone() # Copy output action matrix
while len(delete)<Nqubit: # Initialize the entanglement worker to work on all the qubit connections
minVals, _ = torch.min(new_err, dim=0) # Find the mim err
colInd = torch.argmin(minVals).unsqueeze(0) # Get the column index
rowInd = torch.argmin(new_err[:, colInd]).unsqueeze(0) # Get the row index
delete = torch.cat((delete, rowInd, colInd), dim=0) # Add the index to the entanglement worker
Gc[rowInd, colInd] = 1 # The entanglement trial graph adjacent matrix changes
Gc[colInd, rowInd] = 1 # The entanglement trial graph adjacent matrix changes
new_err[delete, :] = 1 # The cost matrix changes
new_err[:, delete] = 1 # The cost matrix changes
for i in range(Npath): # Assign the success rate of each entanglement worker
r_ent[i] = Rt[delete[2 * i], delete[2 * i + 1]]
while maxSize<30: # Stop condition of cluster building, log2QV<30
while not flag2: # If we are not able to do the next scheduling, keep doing entanglement until we have enough free qubit available for schedule
while not flag: # if state is not changing, keep the loop until anything happens
count += 1 # simulation count increases
for i in range(Npath): # Check all the entanglement worker
if torch.rand(1) < r_ent[i] / rat: # Monte Carlo simulation to see whether the corresponding entanglement is success
r_ent[i].zero_() # If success, the corresponding entanglement rate become zero
aval = torch.cat((aval, delete[2*i:2*i+2])) # The corresponding two qubit free as available list
G0[delete[2 * i], delete[2 * i + 1]] = 1 # Connected qubit graph adjacent matrix update
G0[delete[2 * i + 1], delete[2 * i]] = 1 # Connected qubit graph adjacent matrix update
uf.union(delete[2 * i], delete[2 * i + 1]) # Unionfind data structure update
maxSize = uf.findLargestSetSize().item() # Find max cluster size
a0 = torch.tensor([count], dtype=torch.int32) # Result data preparation
a1 = delete[2 * i].unsqueeze(0) # Result data preparation
a2 = delete[2 * i + 1].unsqueeze(0) # Result data preparation
a3 = torch.tensor([maxSize], dtype=torch.int32) # Result data preparation
iter_data = torch.cat((a0, a1, a2, a3)).unsqueeze(0) # Merge result data into a table
TResult = torch.cat((TResult, iter_data), dim=0) # Attach to the big result table for future QV calculation
delete[2 * i + 1] = -1 # Set the weight of entanglement worker -1
delete[2 * i] = -1 # Set the weight of entanglement worker -1
flag = True # State has been changed
if not flag:
break
indices = aval.int() # check the available list now
sub_G0 = G0[indices][:, indices] # build a subgraph with all the available qubit
eye = torch.eye(len(aval), dtype=torch.int32) # Make the diagonal matrix
sub_G0 += eye # Make the diagonal all 1 in the subgraph
isFullyConnected = torch.all(sub_G0 == 1).item() # Check whether the whole matrix is 1 (no free path for scheduling)
flag = False # State flag reset to False to wait for next state change
if not isFullyConnected: # We have enough free qubit for the next schedule
flag2 = True
MG0_tensor = G0.view(1, 1600, 1) # New connected qubit graph adjacent matrix
MGc_tensor = G0.view(1, 1600, 1) # New entanglement trial graph adjacent matrix
input_data2 = torch.cat((MX_tensor, MR_tensor, MG0_tensor, MGc_tensor, ME_tensor, P1_tensor, P2_tensor), dim=2) # New input features
with torch.no_grad(): # Don't compute the gradient graph (save the memory)
output = model(input_data2) # output of action matrix from input embedding
min_val = output.min() # min output
max_val = output.max() # max output
norm_output = (output - min_val)/(max_val - min_val) # normalized the output from 0 to 1
Ent = norm_output.view(40,40)*0.1+delta*0.1 # Random perturbation to the output
Ent[range(len(Ent)), range(len(Ent))] = 0 # Reset the diagonal items to all 0
MA = Ent + Et # Action matrix: greedy result + NN output
rows, cols = torch.meshgrid(aval, aval, indexing='ij') # We only search for the available scheduling among available qubit list
err = MA.clone() # Copy output action matrix
new_err[rows, cols] = err[rows, cols] # Copy output action matrix
while not torch.all(new_err == 1): # We stop the searching when all the available qubit is assigned for entanglement
minVals, _ = torch.min(new_err, dim=0) # Find the mim err
colInd = torch.argmin(minVals) # Get the column index
rowInd = torch.argmin(new_err[:, colInd]) # Get the row index
if uf.isConnected(colInd, rowInd): # Check if that two available qubit is already in a connected cluster using UnionFind data structure
new_err[rowInd, colInd] = 1 # If so, change the err matrix so in the next search it will not consider this case
new_err[colInd, rowInd] = 1 # If so, change the err matrix so in the next search it will not consider this case
else:
index = torch.where(delete == -1)[0] # If not, find the idle entanglement worker by delete == -1
index = torch.min(index).item() # Find the first idle worker
delete[index] = rowInd # Assign new task to it
delete[index + 1] = colInd # Assign new task to it
Gc[delete[index], delete[index+1]] = 1 # The entanglement trial graph adjacent matrix changes
Gc[delete[index+1], delete[index]] = 1 # The entanglement trial graph adjacent matrix changes
remove_indices = {rowInd.item(), colInd.item()} # To change the aval qubit list
mask = ~torch.isin(aval, torch.tensor(list(remove_indices), dtype=aval.dtype)) # Maks function for pytorch
aval = aval[mask] # Remove the assigned qubit to the entanglement worker to update the qubit available list
new_err[rowInd, :] = 1 # Err matrix update so in the next search it will not consider this case
new_err[:, rowInd] = 1 # Err matrix update so in the next search it will not consider this case
new_err[colInd, :] = 1 # Err matrix update so in the next search it will not consider this case
new_err[:, colInd] = 1 # Err matrix update so in the next search it will not consider this case
flag2 = False # After assigning all the entanglement worker, there is no availabl qubit for next scheduling
flag = False # Wait for the next stage change
for i in range(Npath): # Check all the entanglement workers task update their success rate
if delete[2*i] > -1: # For all the worker that has work
r_ent[i] = Rt[delete[2 * i], delete[2 * i + 1]] # Update the success rate
if err[delete[2 * i], delete[2 * i + 1]]>0.01: # Have a err threshold for the entanglement working on, if the error is too large
r_ent[i] = 0 # Delete this task
aval = torch.cat((aval, delete[2*i:2*i+2])) # Free the corresponding qubit to the available list
delete[2 * i + 1] = -1 # Make the entanglement worker idle
delete[2 * i] = -1 # Make the entanglement worker idle e0 = torch.zeros(count, dtype=torch.float) + 0.00001 # error envolvement list
Nmax = torch.zeros(count, dtype=torch.int32) # max cluster size envolvement list
for i in range(count): # search all the entanglement trial events
for j in range(len(TResult)): # go through the result storage table
if i > TResult[j][0]: # If the counting time is after the success of that entanglement build
e0[i] = e0[i] + 1 - Xt[TResult[j][1], TResult[j][2]] * torch.exp(-(i - TResult[j][0]) / 5e4) # Add the error into the system
for j in range(len(TResult) - 2): # go through the result storage table
Nmax[TResult[j][0]:TResult[j+1][0]] = TResult[j][3] # update teh max cluster size envolvement list
maxQV = torch.max(torch.minimum(Nmax, 1. / e0)) # calculate the maxlog2QV based on the equation
return -maxQV # return as rewarddef funcdif(): # Fitting NN initialization
output = model(input_data)
MA = output.view(40, 40) # resize the output to a 40*40 tensor matrix
criterion = nn.MSELoss() # loss function as the error between output and target
return criterion(MA, Et) # Match the output matrix to the greedy search matrix
num_epochs = 1000 # Number of epochs
for epoch in range(num_epochs): # fitting training process
loss = funcdif() # loss function for fitting
optimizer.zero_grad() # reset gradient
loss.backward() # NN back propagation
optimizer.step() # NN optimization updatefor epoch in range(num_epochs): # Real training for optimizing the MA output
delta = torch.rand(40,40)*0.1 # Random perturbation
loss1 = findQVmax(delta*0) # Averaging the environment simulation to get the policy gradient
loss2 = findQVmax(delta*0) # Averaging the environment simulation to get the policy gradient
loss3 = findQVmax(delta*0) # Averaging the environment simulation to get the policy gradient
loss4 = findQVmax(delta*0) # Averaging the environment simulation to get the policy gradient
loss5 = findQVmax(delta*0) # Averaging the environment simulation to get the policy gradient
avg1 = (loss1+loss2+loss3+loss4+loss5)/5 # Mean quantum volume expectation from current policy
loss6 = findQVmax(delta) # Averaging the environment simulation to get the policy gradient
loss7 = findQVmax(delta) # Averaging the environment simulation to get the policy gradient
loss8 = findQVmax(delta) # Averaging the environment simulation to get the policy gradient
loss9 = findQVmax(delta) # Averaging the environment simulation to get the policy gradient
loss10 = findQVmax(delta) # Averaging the environment simulation to get the policy gradient
avg2 = (loss6+loss7+loss8+loss9+loss10)/5 # Mean quantum volume expectation from current policy add perturbation
output = model(input_data) # Gradient graph calculation
if avg1.item()>avg2.item(): # To see whether the perturbation is good or not
target = output + delta.view(1, 1600, 1) # if it is good, we prefer adding this perturbation
else:
target = output - delta.view(1, 1600, 1) # if not, our target is in another direction
criterion = nn.MSELoss() # cross-entropy loss define
loss = criterion(output,target) # loss as the different between output and target
optimizer.zero_grad() # reset gradient
loss.backward() # NN back propagation
optimizer.step() # NN optimization update






