1
- from unittest import TestCase
2
- from casbin_sqlalchemy_adapter import Adapter , CasbinRule
3
- import casbin
1
+ from casbin_sqlalchemy_adapter import Adapter
2
+ from casbin_sqlalchemy_adapter import Base
3
+ from casbin_sqlalchemy_adapter import CasbinRule
4
4
from sqlalchemy import create_engine
5
5
from sqlalchemy .orm import sessionmaker
6
+ from unittest import TestCase
7
+ import casbin
6
8
import os
9
+ import simpleeval
7
10
8
- dir = os .path .split (os .path .realpath (__file__ ))[0 ]
11
+
12
+ def get_fixture (path ):
13
+ dir_path = os .path .split (os .path .realpath (__file__ ))[0 ] + "/"
14
+ return os .path .abspath (dir_path + path )
9
15
10
16
11
17
def get_enforcer ():
12
- dsn = "sqlite:///" + dir + "/test.db"
13
- adapter = Adapter (dsn )
18
+ engine = create_engine ( "sqlite://" )
19
+ adapter = Adapter (engine )
14
20
15
- engine = create_engine (dsn )
16
21
session = sessionmaker (bind = engine )
22
+ Base .metadata .create_all (engine )
17
23
s = session ()
18
- s .query (CasbinRule ).delete (synchronize_session = False )
19
24
20
25
s .add (CasbinRule (ptype = 'p' , v0 = 'alice' , v1 = 'data1' , v2 = 'read' ))
21
26
s .add (CasbinRule (ptype = 'p' , v0 = 'bob' , v1 = 'data2' , v2 = 'write' ))
@@ -25,7 +30,7 @@ def get_enforcer():
25
30
s .commit ()
26
31
s .close ()
27
32
28
- return casbin .Enforcer (dir + '/ rbac_model.conf' , adapter , True )
33
+ return casbin .Enforcer (get_fixture ( ' rbac_model.conf') , adapter , True )
29
34
30
35
31
36
class TestConfig (TestCase ):
@@ -37,3 +42,63 @@ def test_enforcer_basic(self):
37
42
self .assertTrue (e .enforce ('bob' , 'data2' , 'write' ))
38
43
self .assertTrue (e .enforce ('alice' , 'data2' , 'read' ))
39
44
self .assertTrue (e .enforce ('alice' , 'data2' , 'write' ))
45
+
46
+ def test_add_policy (self ):
47
+ adapter = Adapter ('sqlite://' )
48
+ e = casbin .Enforcer (get_fixture ('rbac_model.conf' ), adapter , True )
49
+
50
+ try :
51
+ self .assertFalse (e .enforce ('alice' , 'data1' , 'read' ))
52
+ self .assertFalse (e .enforce ('bob' , 'data1' , 'read' ))
53
+ self .assertFalse (e .enforce ('bob' , 'data2' , 'write' ))
54
+ self .assertFalse (e .enforce ('alice' , 'data2' , 'read' ))
55
+ self .assertFalse (e .enforce ('alice' , 'data2' , 'write' ))
56
+ except simpleeval .NameNotDefined :
57
+ # This is caused by an upstream bug when there is no policy loaded
58
+ # Should be resolved in pycasbin >= 0.3
59
+ pass
60
+
61
+ adapter .add_policy (sec = None , ptype = 'p' , rule = ['alice' , 'data1' , 'read' ])
62
+ adapter .add_policy (sec = None , ptype = 'p' , rule = ['bob' , 'data2' , 'write' ])
63
+ adapter .add_policy (sec = None , ptype = 'p' , rule = ['data2_admin' , 'data2' , 'read' ])
64
+ adapter .add_policy (sec = None , ptype = 'p' , rule = ['data2_admin' , 'data2' , 'write' ])
65
+ adapter .add_policy (sec = None , ptype = 'g' , rule = ['alice' , 'data2_admin' ])
66
+
67
+ e .load_policy ()
68
+
69
+ self .assertTrue (e .enforce ('alice' , 'data1' , 'read' ))
70
+ self .assertFalse (e .enforce ('bob' , 'data1' , 'read' ))
71
+ self .assertTrue (e .enforce ('bob' , 'data2' , 'write' ))
72
+ self .assertTrue (e .enforce ('alice' , 'data2' , 'read' ))
73
+ self .assertTrue (e .enforce ('alice' , 'data2' , 'write' ))
74
+ self .assertFalse (e .enforce ('bogus' , 'data2' , 'write' ))
75
+
76
+ def test_save_policy (self ):
77
+ model = casbin .Enforcer (get_fixture ('rbac_model.conf' ), get_fixture ('rbac_policy.csv' )).model
78
+ adapter = Adapter ('sqlite://' )
79
+ adapter .save_policy (model )
80
+ e = casbin .Enforcer (get_fixture ('rbac_model.conf' ), adapter )
81
+
82
+ self .assertTrue (e .enforce ('alice' , 'data1' , 'read' ))
83
+ self .assertFalse (e .enforce ('bob' , 'data1' , 'read' ))
84
+ self .assertTrue (e .enforce ('bob' , 'data2' , 'write' ))
85
+ self .assertTrue (e .enforce ('alice' , 'data2' , 'read' ))
86
+ self .assertTrue (e .enforce ('alice' , 'data2' , 'write' ))
87
+
88
+ def test_str (self ):
89
+ rule = CasbinRule (ptype = 'p' , v0 = 'alice' , v1 = 'data1' , v2 = 'read' )
90
+ self .assertEqual (str (rule ), 'p, alice, data1, read' )
91
+
92
+ def test_repr (self ):
93
+ rule = CasbinRule (ptype = 'p' , v0 = 'alice' , v1 = 'data1' , v2 = 'read' )
94
+ self .assertEqual (repr (rule ), '<CasbinRule None: "p, alice, data1, read">' )
95
+ engine = create_engine ("sqlite://" )
96
+
97
+ session = sessionmaker (bind = engine )
98
+ Base .metadata .create_all (engine )
99
+ s = session ()
100
+
101
+ s .add (rule )
102
+ s .commit ()
103
+ self .assertRegex (repr (rule ), r'<CasbinRule \d+: "p, alice, data1, read">' )
104
+ s .close ()
0 commit comments