-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPC.py
345 lines (308 loc) · 15.1 KB
/
PC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import pyAgrum as gum
from itertools import product,combinations
class PC():
def __init__(self,csvFilePath:str) -> None:
self.learner=gum.BNLearner(csvFilePath)
self.G,self.sepSet=self.initialisation()
self.verbose=""
self.GPhase21=None
####### PHASE 1 #######
def phase1(self,nivRisque=0.05):
""" Phase 1 de l'algorithme PC
"""
d=0
ConditionOnAdjX=True
while ConditionOnAdjX:
for X,Y in self.G.edges(): # Ligne 5 :
adjX=self.G.adjacents(X)# foreach arête X-Y tq |Adj(X)\{Y}|≥d
if len(adjX)-1 >=d: #
adjSansY=adjX.copy() # Ligne 6,7 et 12 :
adjSansY.remove(Y) # Choisir un Z in Adj(X)\{Y} tq |Z|=d
for Z in list(combinations(adjSansY,d)): # until tous les Z de taille d ont été testés
if(self.testIndepG2(X, Y, kno=Z, nivRisque=nivRisque)): # Si X indep Y | Z #On peut utiliser Chi2 ou G2 PB TODO
self.G.eraseEdge(X,Y)
for z in Z:
self.sepSet[(X,Y)].add(z)
self.sepSet[(Y,X)].add(z)
break
d+=1
compteur=0
for X in self.G.nodes(): # Ligne 14
if len(self.G.adjacents(X))<=d: #
compteur+=1 # compter pour combien de X |Adj(X)|≤d
else:
break # Si un noeud a plus de d noeuds adjacents, on n'a pas besoin de regarder les autres
if(len(self.G.nodes())==compteur): # Si tous les noeuds vérifie on arrête
ConditionOnAdjX=False #
def phase1_PC_CSS(self,nivRisque=0.05,G1=gum.MixedGraph(),G2=gum.MixedGraph()):
""" NewStep1(G1|G2) de PC-CSS
Les self.G sont remplacé par G1
aSansY est remplacé par set(aSansY).intersection(self.findConsistentSet(X,Y,G2))
"""
d=0
ConditionOnAdjX=True
while ConditionOnAdjX:
a=dict()
for X in G1.nodes():
a[X]=G1.adjacents(X)
for X,Y in G1.edges():
if len(a[X])-1 >=d:
aSansY=a[X].copy()
aSansY.remove(Y)
aSansYEtConsist=set(aSansY).intersection(self.findConsistentSet(X,Y,G2))
for Z in list(combinations(aSansYEtConsist,d)):
if(self.testIndepG2(X, Y, kno=Z, nivRisque=nivRisque)):
G1.eraseEdge(X,Y)
for z in Z:
self.sepSet[(X,Y)].add(z)
self.sepSet[(Y,X)].add(z)
break
d+=1
compteur=0
for X in G1.nodes(): # Ligne 14
if len(G1.adjacents(X))<=d: #
compteur+=1 # compter pour combien de X |Adj(X)|≤d
else:
break # Si un noeud a plus de d noeuds adjacents, on n'a pas besoin de regarder les autres
if(len(G1.nodes())==compteur): # Si tous les noeuds vérifie on arrête
ConditionOnAdjX=False
def phase1_STABLE(self,nivRisque=0.05):
""" Phase 1 de l'algorithme PC-Stable
"""
##print("Avant Phase 1")
# #gnb.sideBySide(self.G)
d=0
ConditionOnAdjX=True
while ConditionOnAdjX:
a=dict()
for X in self.G.nodes():
a[X]=self.G.adjacents(X)
for X,Y in self.G.edges():
if len(a[X])-1 >=d:
aSansY=a[X].copy()
aSansY.remove(Y)
for Z in list(combinations(aSansY,d)):
if(self.testIndepG2(X, Y, kno=Z, nivRisque=nivRisque)):
self.G.eraseEdge(X,Y)
for z in Z:
self.sepSet[(X,Y)].add(z)
self.sepSet[(Y,X)].add(z)
break
d+=1
compteur=0
for X in self.G.nodes(): # Ligne 14
if len(self.G.adjacents(X))<=d: #
compteur+=1 # compter pour combien de X |Adj(X)|≤d
else:
break # Si un noeud a plus de d noeuds adjacents, on n'a pas besoin de regarder les autres
if(len(self.G.nodes())==compteur): # Si tous les noeuds vérifie on arrête
ConditionOnAdjX=False
####### PHASE 2 #######
def phase2(self):
""" Phase 2 de l'algorithme PC, elle permet d'orienter des arêtes pour ne pas avoir plus de V-Struct et de cycles
Après cette phase, G est un graphe essentiel
"""
L=self.findUnshieldedTriple()
hasGoneIn=True
#Phase 2.1 : Orientation des V-Struct
while(hasGoneIn):#Step 2
hasGoneIn=False
for (X,Z,Y) in L:
if Z not in self.sepSet[(X,Y)]:
self.G.eraseEdge(X,Z)
self.G.eraseEdge(Y,Z)
self.G.addArc(X,Z)
self.G.addArc(Y,Z)
L=self.findUnshieldedTriple() #on doit recalculer les UnshieldedTriple dynamiquement dès qu'on change le graphe G...
#sinon il se peut qu'un des triple ait des éléments en commun avec le triple
#pour lequel on a introduit une V-Structure et cela peut mener à la création d'un cycle
hasGoneIn=True #Pour ne pas avoir une boucle infinie quand on ne peut pas orienter des arcs
break
# Phase 2.2 : Propagations, les orientation que la phase 2.1 implique
self.GPhase21=gum.MixedGraph(self.G)#Pour la verbose
self.verbose+="\n"+"Phase 2" #Pour la verbose
nb_iteration=1 #Pour la verbose
plusDarreteOrientable = False #Condition d'arrêt
while not plusDarreteOrientable : #Step3
#Verbose
self.verbose+="\n"+f"Itération n°{nb_iteration}"
nb_iteration+=1
#Condition d'arrêt
plusDarreteOrientable=True
tripletsTemp=list(product(self.G.nodes(),self.G.nodes(),self.G.nodes()))
toDelete=[]
for (i,j,k) in tripletsTemp: #Enlever les triplets avec les mêmes composantes
if(i==j or i==k or j==k):
toDelete.append((i,j,k))
for triple in toDelete:
tripletsTemp.remove(triple)
# if triple in tripletsTemp:
# tripletsTemp.remove(triple)
#Liste de tous les quadruplets
quadrupletsTemp=list(product(self.G.nodes(),self.G.nodes(),self.G.nodes(),self.G.nodes()))
toDelete=[]
for (i,j,k,l) in quadrupletsTemp:#Enlever les quadruplets avec les mêmes composantes
if(i==j or i==k or i==l or j==k or j==l or k==l):
toDelete.append((i,j,k,l))
for quadruple in toDelete:
quadrupletsTemp.remove(quadruple)
# if quadruple in quadrupletsTemp:
# quadrupletsTemp.remove(quadruple)
#R2 (R3 dans pseudo code prof)
# Ici c'est R3 dans le pseudo code du cours, le R2 du papier n'empechait que les
# cycles de taille 3
for (X,Y) in list(product(self.G.nodes(),self.G.nodes())):
self.verbose+="\n"+f"in R2 : (X,Y)={(X,Y)}"
if self.G.existsEdge(X,Y) and self.G.hasDirectedPath(X,Y):
self.verbose+="\n"+f"in R2 : (X,Y)={(X,Y)}, on oriente {X}->{Y}"
self.G.eraseEdge(X,Y)
self.G.addArc(X, Y)
#R3 : une règle pas dans le pseudo code du cours qui évite les "nouvelles" v-struct OU un cycle
for (i,j,k,l) in quadrupletsTemp:
if(self.G.existsEdge(i,j) and self.G.existsEdge(i,k) and self.G.existsArc(k,j) and self.G.existsArc(l,j) and self.G.existsEdge(i,l) and not self.G.existsEdge(k,l)):
self.G.eraseEdge(i,j)
self.G.addArc(i,j)
plusDarreteOrientable=False
break
#R1 (R2 dans pseudo code prof) pour éviter la création de v-struct
for (i,j,k) in tripletsTemp:
self.verbose+="\n"+f"in R1 : (i,j,k)={(i,j,k)}"
if(self.G.existsArc(i,j) and self.G.existsEdge(j,k) and not self.G.existsEdge(i,k)):
self.verbose+="\n"+f"in R1 : (i,j,k)={(i,j,k)}, on oriente {j}->{k}"
self.G.eraseEdge(j,k)
self.G.addArc(j, k)
plusDarreteOrientable=False
break
# Nous avons choisis de mettre R2 et R3 avant R1 pour limiter la création de cycle
# Pendant l'exécution de l'algorithme. La règle R3, si appliquée, en particulier construit
# de "nouvelles" v-struct qui interdise l'ajout d'un nouveau cycle.
#
# Dans les situations où nous allons appliquer R3, il est en effet préférable d'introduire
# ces v-struct plutôt que de possiblement d'introduire des cycles en faisant R1 avant.
####### FONCTIONS UTILITAIRES #######
def initialisation(self):
""" Initialise l'algorithme PC :
crée un graphe non orienté complet et initialise les sepset vides
Returns
-------
UndiGraph Graph et Dict
G : un graphe complet non orienté et
sepSet : un dictionnaire d'ensemble séparant vides pour toutes paires de noeuds
"""
G=gum.MixedGraph()
G.addNodes(len(self.learner.names()))
sepSet=dict()
for node1,node2 in list(product(G.nodes(),G.nodes())): #produit cartésien de G.nodes
if(not G.existsEdge(node2,node1) and node1!=node2):
G.addEdge(node1,node2)
sepSet[(node1,node2)]=set()
sepSet[(node2,node1)]=sepSet[(node1,node2)]
return G,sepSet#,G_directed
def findUnshieldedTriple(self)->list:
""" Permet de trouver les unshielded triple
Renvoie la liste des id des triplets concernés
"""
triples=[]
for Z in self.G.nodes():
for X in self.G.nodes():
if Z == X or not self.G.existsEdge(X,Z): #X-Z : X est connecté à Z
continue
for Y in self.G.nodes():
if Y == X or Y == Z or (not self.G.existsEdge(Y,Z)) or self.G.existsEdge(X,Y) or self.G.existsArc(X, Y) or self.G.existsArc(Y,X): #X-Z-Y : X est connecté à Z et Z connecté à Y
continue
triples.append((X,Z,Y))
l=[]#Enlever les doublons
for i in range(len(triples)):
triple1=triples[i]
for j in range(i+1,len(triples)):
triple2=triples[j]
##print(triple1,triple2,triple1[0] in triple2 and triple1[1] in triple2 and triple1[2] in triple2)
if(triple1[0] in triple2 and triple1[1] in triple2 and triple1[2] in triple2):
l.append(triple2)
for triple in l:
if triple in triples:
triples.remove(triple)
return triples
def findConsistentSet(self,X,Y,G2):
"""Fonction qui trouve le consistent set de deux noeuds X,Y dans un graphe G2
Parameters
----------
X : int
id d'un noeud dans G
Y : int
id d'un noeud dans G
G2 : pyAgrum.MixedGraph
Graphe dans lequel calculer le consistent set
Returns
-------
set
l'ensemble consistent de x et y dans G2
"""
consistentSet=set()
for Z in G2.adjacents(X):
if Z!=Y and not G2.existsArc(X,Z):
if len(G2.mixedUnorientedPath(X,Z))!=0 and len(G2.mixedUnorientedPath(Z,Y))!=0:
consistentSet.add(Z)
return consistentSet
def testIndepChi2(self, var1, var2, kno=[], nivRisque=0.05,verbose=False):
""" Effectue un test chi2 d'indépendance entre var1 et var2 conditionnellement à l'ensemble kno
Parameters
----------
var1 : int
L'id de la première variable avec laquelle tester l'indépendance
var2 : int
L'id de la deuxième variable avec laquelle tester l'indépendance
kno=[] : list[int]
La liste des ids des variables qui conditionnent l'indépendance à tester
nivRisque : float
Le niveau de risque (souvent 5%)
Returns
-------
bool
Vrai si il y a indépendance et faux sinon
"""
nameVar1=f"n_{var1}"#=nom du noeud d'ID var 1 dans le BN, pour donner des visualisations cohérentes car id dans learner ≠ id dans bn
nameVar2=f"n_{var2}"
names_kno=[f"n_{var}" for var in kno]
stat,pvalue=self.learner.chi2(nameVar1,nameVar2,names_kno)
if(verbose):
if len(kno)==0:
print("Le test Chi2 indique que '{}' indépendant de '{}' ==> {}".format(nameVar1,nameVar2,pvalue>=nivRisque))
pass
else:
print("Le test Chi2 indique que '{}' indépendant de '{}' étant donné {} ==> {}".format(nameVar1,nameVar2,names_kno,pvalue>=nivRisque))
pass
if pvalue>=nivRisque:
return True
return False
def testIndepG2(self, var1, var2, kno=[], nivRisque=0.05,verbose=False):
""" Effectue un test G2 d'indépendance entre var1 et var2 conditionnellement à l'ensemble kno
Parameters
----------
var1 : int
L'id de la première variable avec laquelle tester l'indépendance
var2 : int
L'id de la deuxième variable avec laquelle tester l'indépendance
kno=[] : list[int]
La liste des ids des variables qui conditionnent l'indépendance à tester
nivRisque : float
Le niveau de risque (souvent 5%)
Returns
-------
bool
Vrai si il y a indépendance et faux sinon
"""
nameVar1=f"n_{var1}"
nameVar2=f"n_{var2}"
names_kno=[f"n_{var}" for var in kno]
stat,pvalue=self.learner.G2(nameVar1,nameVar2,names_kno)
if(verbose):
if len(kno)==0:
print("Le test G2 indique que '{}' indépendant de '{}' ==> {}".format(nameVar1,nameVar2,pvalue>=nivRisque))
pass
else:
print("Le test G2 indique que '{}' indépendant de '{}' étant donné {} ==> {}".format(nameVar1,nameVar2,names_kno,pvalue>=nivRisque))
pass
if pvalue>=nivRisque:
return True
return False