13
13
import os
14
14
import random
15
15
import re
16
- import shutil
17
16
import socket
18
17
import threading
19
18
import time
@@ -156,7 +155,7 @@ def test_untagged(self):
156
155
self .verify_events_log (event_log )
157
156
158
157
159
- class FaucetSingle8021XSuccessTest (FaucetUntaggedTest ):
158
+ class FaucetUntagged8021XSuccessTest (FaucetUntaggedTest ):
160
159
161
160
SOFTWARE_ONLY = True
162
161
@@ -223,7 +222,7 @@ class FaucetSingle8021XSuccessTest(FaucetUntaggedTest):
223
222
network={
224
223
key_mgmt=IEEE8021X
225
224
eap=MD5
226
- identity="user"
225
+ identity="user@example.com "
227
226
password="microphone"
228
227
}
229
228
"""
@@ -248,19 +247,11 @@ def _write_faucet_config(self):
248
247
249
248
self .CONFIG = self .CONFIG .replace ('NFV_INTF' , str (nfv_intf ))
250
249
self .CONFIG_GLOBAL = self .CONFIG_GLOBAL .replace ("NFV_MAC" , nfv_intf .MAC ())
251
- super (FaucetSingle8021XSuccessTest , self )._write_faucet_config ()
250
+ super (FaucetUntagged8021XSuccessTest , self )._write_faucet_config ()
252
251
253
252
def setUp (self ):
254
- super (FaucetSingle8021XSuccessTest , self ).setUp ()
253
+ super (FaucetUntagged8021XSuccessTest , self ).setUp ()
255
254
self .host_drop_all_ips (self .nfv_host )
256
- self .radius_port = 1812
257
- # self.radius_port = mininet_test_util.find_free_port(
258
- # self.ports_sock, self._test_name())
259
- self .start_freeradius ()
260
-
261
- def tearDown (self ):
262
- self .nfv_host .cmd ('kill %d' % self .freeradius_pid )
263
- super (FaucetSingle8021XSuccessTest , self ).tearDown ()
264
255
265
256
def try_8021x (self , and_logff = False ):
266
257
tcpdump_filter = 'ether proto 0x888e'
@@ -272,28 +263,22 @@ def try_8021x(self, and_logff=False):
272
263
273
264
def test_untagged (self ):
274
265
tcpdump_txt = self .try_8021x (and_logff = True )
275
-
276
266
self .assertIn ('Success' , tcpdump_txt )
277
267
self .assertEqual (
278
268
1 ,
279
- self .scrape_prometheus_var ('dp_dot1x_success' , default = 0 ))
280
- self .assertEqual (
281
- 1 ,
282
- self .scrape_prometheus_var ('port_dot1x_success' , labels = {'port' : 1 }, default = 0 ))
269
+ self .scrape_prometheus_var ('dp_dot1x_success' , any_labels = True , default = 0 ))
283
270
self .assertEqual (
284
271
0 ,
285
- self .scrape_prometheus_var ('dp_dot1x_failure' , default = 0 ))
286
- self .assertEqual (
287
- 0 ,
288
- self .scrape_prometheus_var ('port_dot1x_failure' , labels = {'port' : 1 }, default = 0 ))
272
+ self .scrape_prometheus_var ('dp_dot1x_failure' , any_labels = True , default = 0 ))
289
273
self .assertEqual (
290
274
1 ,
291
- self .scrape_prometheus_var ('dp_dot1x_logoff' , default = 0 ))
275
+ self .scrape_prometheus_var ('port_dot1x_success' , any_labels = True , default = 0 ))
292
276
self .assertEqual (
293
- 1 ,
294
- self .scrape_prometheus_var ('port_dot1x_logoff ' , labels = { 'port' : 1 } , default = 0 ))
277
+ 0 ,
278
+ self .scrape_prometheus_var ('port_dot1x_failure ' , any_labels = True , default = 0 ))
295
279
self .assertIn ('Success' , tcpdump_txt )
296
280
self .assertIn ('logoff' , tcpdump_txt )
281
+ # TODO check prometheus dp/port_dot1x_logoff once logoff_handler implemented on chewie side.
297
282
298
283
def wpa_supplicant_callback (self , and_logoff ):
299
284
wpa_ctrl_path = os .path .join (
@@ -314,100 +299,36 @@ def wpa_supplicant_callback(self, and_logoff):
314
299
break
315
300
time .sleep (1 )
316
301
self .assertEqual (eap_state , 'SUCCESS' )
317
- self .wait_until_matching_flow (
318
- {'eth_src' : self .eapol_host .MAC (), 'in_port' : 1 }, table_id = 0 )
319
-
320
302
self .eapol_host .cmd ('wpa_cli -p %s logoff' % wpa_ctrl_path )
321
303
322
- for i in range (10 ):
323
- if not self .matching_flow_present (
324
- {'eth_src' : self .eapol_host .MAC (), 'in_port' : 1 }, table_id = 0 ):
325
- break
326
- time .sleep (1 )
327
- else :
328
- self .fail ('authentication flow was not removed.' )
329
-
330
- def wait_for_radius (self , radius_log_path , timeout = 10 ):
331
- for i in range (timeout ):
332
- if os .path .exists (radius_log_path ):
333
- break
334
- time .sleep (1 )
335
- else :
336
- self .fail ('could not open radius log after %d seconds' % timeout )
337
-
338
- with open (radius_log_path , 'r' ) as log :
339
- while True :
340
- line = log .readline ()
341
- if not line :
342
- time .sleep (1 )
343
- continue
344
- if line .strip () == 'Ready to process requests.' :
345
- return
346
-
347
- def start_freeradius (self ):
348
- with open ('/etc/freeradius/users' , 'w' ) as f :
349
- f .write ('user Cleartext-Password := "microphone"' )
350
-
351
- with open ('/etc/freeradius/clients.conf' , 'w' ) as f :
352
- f .write ('''client localhost {
353
- ipaddr = 127.0.0.1
354
- secret = SECRET
355
- }''' )
356
-
357
- radius_log_path = '%s/radius.log' % self .tmpdir
358
- shutil .copytree ('/etc/freeradius/' , '%s/freeradius' % self .tmpdir )
359
- os .system ('chmod o+rx %s' % self .root_tmpdir )
360
- os .system ('chown -R root:freerad %s/freeradius/*' % self .tmpdir )
361
- os .system ('chown root:freerad %s/freeradius' % self .tmpdir )
362
-
363
- with open ('%s/freeradius/radiusd.conf' % self .tmpdir , 'r+' ) as radiusd_file :
364
- config = radiusd_file .read ()
365
- radiusd_file .seek (0 )
366
- radiusd_file .truncate ()
367
- new_config = config .replace ('port = 0' , 'port = %d' % self .radius_port , 2 )
368
- radiusd_file .write (new_config )
369
304
370
- self .nfv_host .cmd ('freeradius -sxx -l %s -d %s/freeradius &' % (radius_log_path , self .tmpdir ))
371
-
372
- self .freeradius_pid = self .nfv_host .lastPid
373
- self .wait_for_radius (radius_log_path )
374
- return radius_log_path
375
-
376
-
377
- class FaucetSingle8021XFailureTest (FaucetSingle8021XSuccessTest ):
305
+ class FaucetUntagged8021XFailureTest (FaucetUntagged8021XSuccessTest ):
378
306
"""Failure due to incorrect identity/password"""
379
307
380
308
wpasupplicant_conf = """
381
309
ap_scan=0
382
310
network={
383
311
key_mgmt=IEEE8021X
384
312
eap=MD5
385
- identity="user"
313
+ identity="user@example.com "
386
314
password="wrongpassword"
387
315
}
388
316
"""
389
317
390
318
def test_untagged (self ):
391
319
tcpdump_txt = self .try_8021x (and_logff = False )
392
320
self .assertIn ('Failure' , tcpdump_txt )
321
+ faucet_log = self .env ['faucet' ]['FAUCET_LOG' ]
322
+ with open (faucet_log , 'r' ) as log :
323
+ faucet_log_txt = log .read ()
324
+ self .assertNotIn ('Successful auth' , faucet_log_txt )
393
325
self .assertEqual (
394
326
0 ,
395
- self .scrape_prometheus_var ('dp_dot1x_success' , default = 0 ))
327
+ self .scrape_prometheus_var ('dp_dot1x_success' , labels = { 'port' : 1 }, default = 0 ))
396
328
self .assertEqual (
397
329
0 ,
398
330
self .scrape_prometheus_var ('port_dot1x_success' , labels = {'port' : 1 }, default = 0 ))
399
- self .assertEqual (
400
- 0 ,
401
- self .scrape_prometheus_var ('dp_dot1x_logoff' , default = 0 ))
402
- self .assertEqual (
403
- 0 ,
404
- self .scrape_prometheus_var ('port_dot1x_logoff' , labels = {'port' : 1 }, default = 0 ))
405
- self .assertEqual (
406
- 1 ,
407
- self .scrape_prometheus_var ('dp_dot1x_failure' , default = 0 ))
408
- self .assertEqual (
409
- 1 ,
410
- self .scrape_prometheus_var ('port_dot1x_failure' , labels = {'port' : 1 }, default = 0 ))
331
+ # TODO add prometheus dp/port_dot1x_failure check once failure handler is implemented on chewie side.
411
332
412
333
413
334
class FaucetUntaggedRandomVidTest (FaucetUntaggedTest ):
0 commit comments