1+ # SimQN: a discrete-event simulator for the quantum networks
2+ # Copyright (C) 2021-2022 Lutong Chen, Jian Li, Kaiping Xue
3+ # University of Science and Technology of China, USTC.
4+ #
5+ # This program is free software: you can redistribute it and/or modify
6+ # it under the terms of the GNU General Public License as published by
7+ # the Free Software Foundation, either version 3 of the License, or
8+ # (at your option) any later version.
9+ #
10+ # This program is distributed in the hope that it will be useful,
11+ # but WITHOUT ANY WARRANTY; without even the implied warranty of
12+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+ # GNU General Public License for more details.
14+ #
15+ # You should have received a copy of the GNU General Public License
16+ # along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
18+ from typing import Callable , Dict , List , Tuple , Union
19+ import math
20+
21+ from qns .entity .node .node import QNode
22+ from qns .entity .qchannel .qchannel import QuantumChannel
23+ from qns .entity .cchannel .cchannel import ClassicChannel
24+ from qns .network .route .route import RouteImpl , NetworkRouteError
25+ import heapq
26+
27+
28+ class heapitem :
29+ def __init__ (self , item : object , key : float ) -> None :
30+ self .item = item
31+ self .key = key
32+
33+ def __lt__ (self , other ) -> bool :
34+ return self .key < other .key
35+
36+ def __iter__ (self ):
37+ return iter ((self .item , self .key ))
38+
39+
40+ class DijkstraRouteAlgorithmHeap (RouteImpl ):
41+ """
42+ This is the dijkstra route algorithm implement
43+ """
44+
45+ INF = math .inf
46+
47+ def __init__ (
48+ self ,
49+ name : str = "dijkstra" ,
50+ metric_func : Callable [[Union [QuantumChannel , ClassicChannel ]], float ] = None ,
51+ ) -> None :
52+ """
53+ Args:
54+ name: the routing algorithm's name
55+ metric_func: the function that returns the metric for each channel.
56+ The default is the const function m(l)=1
57+ """
58+ self .name = name
59+ self .route_table = {}
60+
61+ if metric_func is None :
62+ self .metric_func = lambda _ : 1
63+ else :
64+ self .metric_func = metric_func
65+
66+ self .pathset = {}
67+
68+ def build (
69+ self , nodes : List [QNode ], channels : List [Union [QuantumChannel , ClassicChannel ]]
70+ ):
71+ neighbors_table = {node : [] for node in nodes }
72+ for channel in channels :
73+ assert len (channel .node_list ) == 2
74+ metric = self .metric_func (channel )
75+
76+ [node1 , node2 ] = channel .node_list
77+ neighbors_table [node1 ].append ((node2 , metric ))
78+ neighbors_table [node2 ].append ((node1 , metric ))
79+
80+ for srcn in nodes :
81+ nodes_cost = {node : self .INF for node in nodes }
82+ nodes_cost [srcn ] = 0
83+ prev_node = {node : None for node in nodes }
84+
85+ heap = []
86+ heapq .heappush (heap , heapitem (srcn , 0 ))
87+
88+ while len (heap ) != 0 :
89+ (node , cost ) = heapq .heappop (heap )
90+
91+ if cost > nodes_cost [node ]:
92+ continue
93+
94+ for neigh , metric in neighbors_table [node ]:
95+ neigh_new_cost = cost + metric
96+ if neigh_new_cost < nodes_cost [neigh ]:
97+ nodes_cost [neigh ] = neigh_new_cost
98+ heapq .heappush (heap , heapitem (neigh , nodes_cost [neigh ]))
99+ prev_node [neigh ] = node
100+
101+ srcpath = {}
102+ for dst in nodes :
103+ if nodes_cost [dst ] == self .INF :
104+ continue
105+
106+ if dst in srcpath :
107+ continue
108+
109+ path = []
110+ curr = dst
111+ while curr is not None :
112+ path .append (curr )
113+ curr = prev_node [curr ]
114+ path .reverse ()
115+
116+ while len (path ) != 0 :
117+ if path [- 1 ] in srcpath :
118+ break
119+ srcpath [path [- 1 ]] = [nodes_cost [path [- 1 ]], path [:]]
120+ path .pop ()
121+ self .route_table [srcn ]= srcpath
122+
123+ def query (self , src : QNode , dest : QNode ) -> List [Tuple [float , QNode , List [QNode ]]]:
124+ """
125+ query the metric, nexthop and the path
126+
127+ Args:
128+ src: the source node
129+ dest: the destination node
130+
131+ Returns:
132+ A list of route paths. The result should be sortted by the priority.
133+ The element is a tuple containing: metric, the next-hop and the whole path.
134+ """
135+ src_routelist : Dict [QNode , List [float , List [QNode ]]] = self .route_table .get (src , None )
136+ if src_routelist is None :
137+ return []
138+
139+ dst_path = src_routelist .get (dest , None )
140+ if dst_path is None :
141+ return []
142+
143+ try :
144+ metric = dst_path [0 ]
145+ path : List [QNode ] = dst_path [1 ][:]
146+ if len (path ) <= 1 or metric == self .INF :
147+ next_hop = None
148+ return []
149+ else :
150+ next_hop = path [1 ]
151+ return [(metric , next_hop , path )]
152+ except Exception :
153+ return []
0 commit comments