Skip to content

Commit 1fc6368

Browse files
committed
Added CV to guard calls to Channel.read
1 parent 2c0e554 commit 1fc6368

File tree

2 files changed

+42
-13
lines changed

2 files changed

+42
-13
lines changed

async/Async_OpenFlow0x01.ml

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,14 @@ module Controller = struct
334334
module Log = Async_OpenFlow_Log
335335
let tags = [("openflow", "openflow0x01")]
336336

337+
(* We can not call read() on the same pipe concurrently.
338+
Somehow this is happening sometimes, so we need to
339+
enforce this invariant locally with condition variables. *)
340+
341+
let read_outstanding = ref false
342+
343+
let read_finished = Condition.create ()
344+
337345
module Client_id = ControllerProcess.Client_id
338346
type t = ([ `Barrier of SwitchMap.key
339347
| `Individual_stats of
@@ -381,36 +389,47 @@ module Controller = struct
381389
| `Send_pkt_out_resp of (unit, exn) Result.t
382390
| `Aggregate_stats_resp of (OpenFlow0x01_Stats.aggregateStats, exn) Result.t
383391
]) Channel.t
392+
let rec clear_to_read () = if (!read_outstanding)
393+
then Condition.wait read_finished >>= clear_to_read
394+
else return (read_outstanding := true)
395+
396+
let signal_read () = read_outstanding := false;
397+
Condition.broadcast read_finished ()
384398

385399
let aggregate_stats ?(pattern=C.match_all) (t : t) sw_id =
400+
clear_to_read () >>= fun () ->
386401
Log.debug ~tags "aggregate_stats (local)";
387402
Channel.write t (`Aggregate_stats (pattern, sw_id));
388403
Channel.read t >>| function
389-
| `Aggregate_stats_resp resp -> resp
404+
| `Aggregate_stats_resp resp -> signal_read (); resp
390405

391406
let send_pkt_out (t : t) (sw_id:Client_id.t) pkt_out =
407+
clear_to_read () >>= fun () ->
392408
Log.debug ~tags "send_pkt_out (local)";
393409
Channel.write t (`Send_pkt_out (sw_id, pkt_out));
394410
Channel.read t >>| function
395-
| `Send_pkt_out_resp resp -> resp
411+
| `Send_pkt_out_resp resp -> signal_read (); resp
396412

397413
let send_flow_mods ?(clear=true) (t : t) (sw_id:Client_id.t) flow_mods =
414+
clear_to_read () >>= fun () ->
398415
Log.debug ~tags "send_flow_mods (local)";
399416
Channel.write t (`Send_flow_mods (clear, sw_id, flow_mods));
400417
Channel.read t >>| function
401-
| `Send_flow_mods_resp resp -> resp
418+
| `Send_flow_mods_resp resp -> signal_read (); resp
402419

403420
let clear_flows ?(pattern=C.match_all) (t : t) (sw_id:Client_id.t) =
421+
clear_to_read () >>= fun () ->
404422
Log.debug ~tags "clear_flows (local)";
405423
Channel.write t (`Clear_flows (pattern, sw_id));
406424
Channel.read t >>| function
407-
| `Clear_flows_resp resp -> resp
425+
| `Clear_flows_resp resp -> signal_read (); resp
408426

409427
let get_switches (t : t) =
428+
clear_to_read () >>= fun () ->
410429
Log.debug ~tags "get_switches (local)";
411430
Channel.write t `Get_switches;
412431
Channel.read t >>| function
413-
| `Get_switches_resp resp -> resp
432+
| `Get_switches_resp resp -> signal_read (); resp
414433

415434
let set_kill_wait t (s:Time.Span.t) =
416435
Log.debug ~tags "set_kill_wait (local)";
@@ -425,16 +444,18 @@ module Controller = struct
425444
Channel.write t (`Set_idle_wait s)
426445

427446
let listening_port (t : t) =
447+
clear_to_read () >>= fun () ->
428448
Log.debug ~tags "set_listening_port (local)";
429449
Channel.write t `Listening_port;
430450
Channel.read t >>| function
431-
| `Listening_port_resp resp -> resp
451+
| `Listening_port_resp resp -> signal_read (); resp
432452

433453
let client_addr_port (t : t) sw_id =
454+
clear_to_read () >>= fun () ->
434455
Log.debug ~tags "client_addr_port (local)";
435456
Channel.write t (`Client_addr_port sw_id);
436457
Channel.read t >>| function
437-
| `Client_addr_port_resp resp -> resp
458+
| `Client_addr_port_resp resp -> signal_read (); resp
438459

439460
let send_to_all (t : t) msg =
440461
Log.debug ~tags "send_to_all (local)";
@@ -445,10 +466,11 @@ module Controller = struct
445466
Channel.write t (`Send_ignore_errors (sw_id, msg))
446467

447468
let has_client_id (t : t) sw_id =
469+
clear_to_read () >>= fun () ->
448470
Log.debug ~tags "has_client_id (local)";
449471
Channel.write t (`Has_client_id sw_id);
450472
Channel.read t >>| function
451-
| `Has_client_id_resp resp -> resp
473+
| `Has_client_id_resp resp -> signal_read (); resp
452474

453475
let close (t : t) sw_id =
454476
Log.debug ~tags "close (local)";
@@ -477,10 +499,11 @@ module Controller = struct
477499
c
478500

479501
let send (t : t) sw_id msg =
502+
clear_to_read () >>= fun () ->
480503
Log.debug ~tags "send (local)";
481504
Channel.write t (`Send (sw_id, msg));
482505
Channel.read t >>| function
483-
| `Send_resp resp -> resp
506+
| `Send_resp resp -> signal_read (); resp
484507

485508
let channel_transfer chan writer =
486509
Deferred.forever () (fun _ -> Channel.read chan >>=
@@ -490,22 +513,26 @@ module Controller = struct
490513
Channel.write t `Listen;
491514
let reader,writer = Pipe.create () in
492515
don't_wait_for (
516+
clear_to_read () >>= fun () ->
493517
Log.debug ~tags "About to listen for listen_resp";
494518
Channel.read t >>| function
495519
| `Listen_resp chan -> Log.debug ~tags "Listen channel returned (local)";
520+
signal_read ();
496521
Channel.write chan `Ready;
497522
channel_transfer chan writer);
498523
reader
499524

500525
let barrier (t : t) sw_id =
526+
clear_to_read () >>= fun () ->
501527
Log.debug ~tags "barrier (local)";
502528
Channel.write t (`Barrier sw_id);
503529
Channel.read t >>| function
504-
| `Barrier_resp resp -> resp
530+
| `Barrier_resp resp -> signal_read (); resp
505531

506532
let individual_stats ?(pattern=C.match_all) (t : t) sw_id =
533+
clear_to_read () >>= fun () ->
507534
Log.debug ~tags "individual_stats (local)";
508535
Channel.write t (`Individual_stats (pattern, sw_id));
509536
Channel.read t >>| function
510-
| `Individual_stats_resp resp -> resp
537+
| `Individual_stats_resp resp -> signal_read (); resp
511538
end

async/Async_OpenFlow_Log.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
open Core.Std
22
open Async.Std
33

4-
module Log = Log.Make_global ()
5-
include Log
4+
(* module Log = Log.Make_global () *)
5+
(* include Log *)
6+
module Log = Log.Global
7+
include Log

0 commit comments

Comments
 (0)