@@ -32,6 +32,7 @@ package flow
32
32
import (
33
33
"net"
34
34
"os"
35
+ "os/signal"
35
36
"runtime"
36
37
"sync/atomic"
37
38
"time"
@@ -48,7 +49,7 @@ var createdPorts []port
48
49
var portPair map [types.IPv4Address ](* port )
49
50
var schedState * scheduler
50
51
var vEach [10 ][vBurstSize ]uint8
51
- var devices map [string ]int
52
+ var ioDevices map [string ]interface {}
52
53
53
54
type Timer struct {
54
55
t * time.Ticker
@@ -176,6 +177,19 @@ func addOSReceiver(socket int, out low.Rings) {
176
177
schedState .addFF ("OS receiver" , nil , recvOS , nil , par , nil , sendReceiveKNI , 0 , & par .stats )
177
178
}
178
179
180
+ type receiveXDPParameters struct {
181
+ out low.Rings
182
+ socket low.XDPSocket
183
+ stats common.RXTXStats
184
+ }
185
+
186
+ func addXDPReceiver (socket low.XDPSocket , out low.Rings ) {
187
+ par := new (receiveXDPParameters )
188
+ par .socket = socket
189
+ par .out = out
190
+ schedState .addFF ("AF_XDP receiver" , nil , recvXDP , nil , par , nil , sendReceiveKNI , 0 , & par .stats )
191
+ }
192
+
179
193
type KNIParameters struct {
180
194
in low.Rings
181
195
out low.Rings
@@ -269,6 +283,19 @@ func addSenderOS(socket int, in low.Rings, inIndexNumber int32) {
269
283
schedState .addFF ("sender OS" , nil , sendOS , nil , par , nil , sendReceiveKNI , inIndexNumber , & par .stats )
270
284
}
271
285
286
+ type sendXDPParameters struct {
287
+ in low.Rings
288
+ socket low.XDPSocket
289
+ stats common.RXTXStats
290
+ }
291
+
292
+ func addSenderXDP (socket low.XDPSocket , in low.Rings , inIndexNumber int32 ) {
293
+ par := new (sendXDPParameters )
294
+ par .socket = socket
295
+ par .in = in
296
+ schedState .addFF ("AF_XDP sender" , nil , sendXDP , nil , par , nil , sendReceiveKNI , inIndexNumber , & par .stats )
297
+ }
298
+
272
299
type copyParameters struct {
273
300
in low.Rings
274
301
out low.Rings
@@ -439,7 +466,7 @@ const reportMbits = false
439
466
440
467
var sizeMultiplier uint
441
468
var schedTime uint
442
- var hwtxchecksum , hwrxpacketstimestamp bool
469
+ var hwtxchecksum , hwrxpacketstimestamp , setSIGINTHandler bool
443
470
var maxRecv int
444
471
445
472
type port struct {
@@ -538,6 +565,11 @@ type Config struct {
538
565
// Enables hardware assisted timestamps in packet mbufs. These
539
566
// timestamps can be accessed with GetPacketTimestamp function.
540
567
HWRXPacketsTimestamp bool
568
+ // Disable setting custom handler for SIGINT in
569
+ // SystemStartScheduler. When handler is enabled
570
+ // SystemStartScheduler waits for SIGINT notification and calls
571
+ // SystemStop after it. It is enabled by default.
572
+ NoSetSIGINTHandler bool
541
573
}
542
574
543
575
// SystemInit is initialization of system. This function should be always called before graph construction.
@@ -665,7 +697,7 @@ func SystemInit(args *Config) error {
665
697
}
666
698
}
667
699
portPair = make (map [types.IPv4Address ](* port ))
668
- devices = make (map [string ]int )
700
+ ioDevices = make (map [string ]interface {} )
669
701
// Init scheduler
670
702
common .LogTitle (common .Initialization , "------------***------ Initializing scheduler -----***------------" )
671
703
StopRing := low .CreateRings (burstSize * sizeMultiplier , maxInIndex /* Maximum possible rings */ )
@@ -690,6 +722,8 @@ func SystemInit(args *Config) error {
690
722
vEach [i ][j ] = uint8 (i )
691
723
}
692
724
}
725
+
726
+ setSIGINTHandler = ! args .NoSetSIGINTHandler
693
727
return nil
694
728
}
695
729
@@ -723,7 +757,19 @@ func SystemStartScheduler() error {
723
757
return common .WrapWithNFError (err , "scheduler start failed" , common .Fail )
724
758
}
725
759
common .LogTitle (common .Initialization , "------------***---------- NFF-GO Started ---------***------------" )
726
- schedState .schedule (schedTime )
760
+
761
+ if setSIGINTHandler {
762
+ signalChan := make (chan os.Signal , 1 )
763
+ signal .Notify (signalChan , os .Interrupt )
764
+ go func () {
765
+ schedState .schedule (schedTime )
766
+ }()
767
+ <- signalChan
768
+ common .LogTitle (common .Debug , "Received an interrupt, stopping everything" )
769
+ SystemStop ()
770
+ } else {
771
+ schedState .schedule (schedTime )
772
+ }
727
773
return nil
728
774
}
729
775
@@ -819,13 +865,14 @@ func SetReceiver(portId uint16) (OUT *Flow, err error) {
819
865
// Gets name of device, will return error if can't initialize socket.
820
866
// Creates RAW socket, returns new opened flow with received packets.
821
867
func SetReceiverOS (device string ) (* Flow , error ) {
822
- socketID , ok := devices [device ]
868
+ v , ok := ioDevices [device ]
869
+ socketID := v .(int )
823
870
if ! ok {
824
871
socketID = low .InitDevice (device )
825
872
if socketID == - 1 {
826
873
return nil , common .WrapWithNFError (nil , "Can't initialize socket" , common .BadSocket )
827
874
}
828
- devices [device ] = socketID
875
+ ioDevices [device ] = socketID
829
876
}
830
877
rings := low .CreateRings (burstSize * sizeMultiplier , 1 )
831
878
addOSReceiver (socketID , rings )
@@ -839,18 +886,57 @@ func SetSenderOS(IN *Flow, device string) error {
839
886
if err := checkFlow (IN ); err != nil {
840
887
return err
841
888
}
842
- socketID , ok := devices [device ]
889
+ v , ok := ioDevices [device ]
890
+ socketID := v .(int )
843
891
if ! ok {
844
892
socketID = low .InitDevice (device )
845
893
if socketID == - 1 {
846
894
return common .WrapWithNFError (nil , "Can't initialize socket" , common .BadSocket )
847
895
}
848
- devices [device ] = socketID
896
+ ioDevices [device ] = socketID
849
897
}
850
898
addSenderOS (socketID , finishFlow (IN ), IN .inIndexNumber )
851
899
return nil
852
900
}
853
901
902
+ // SetReceiverXDP adds function receive from Linux AF_XDP to flow graph.
903
+ // Gets name of device and queue number, will return error if can't initialize socket.
904
+ // Creates AF_XDP socket, returns new opened flow with received packets.
905
+ func SetReceiverXDP (device string , queue int ) (* Flow , error ) {
906
+ _ , ok := ioDevices [device ]
907
+ if ok {
908
+ return nil , common .WrapWithNFError (nil , "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now" , common .BadSocket )
909
+ }
910
+ socketID := low .InitXDP (device , queue )
911
+ if socketID == nil {
912
+ return nil , common .WrapWithNFError (nil , "Can't initialize AF_XDP socket" , common .BadSocket )
913
+ }
914
+ ioDevices [device ] = socketID
915
+ rings := low .CreateRings (burstSize * sizeMultiplier , 1 )
916
+ addXDPReceiver (socketID , rings )
917
+ return newFlow (rings , 1 ), nil
918
+ }
919
+
920
+ // SetSenderXDP adds function send from flow graph to Linux AF_XDP interface.
921
+ // Gets name of device, will return error if can't initialize socket.
922
+ // Creates RAW socket, sends packets, closes input flow.
923
+ func SetSenderXDP (IN * Flow , device string ) error {
924
+ if err := checkFlow (IN ); err != nil {
925
+ return err
926
+ }
927
+ _ , ok := ioDevices [device ]
928
+ if ok {
929
+ return common .WrapWithNFError (nil , "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now" , common .BadSocket )
930
+ }
931
+ socketID := low .InitXDP (device , 0 )
932
+ if socketID == nil {
933
+ return common .WrapWithNFError (nil , "Can't initialize AF_XDP socket" , common .BadSocket )
934
+ }
935
+ ioDevices [device ] = socketID
936
+ addSenderXDP (socketID , finishFlow (IN ), IN .inIndexNumber )
937
+ return nil
938
+ }
939
+
854
940
// SetReceiverKNI adds function receive from KNI to flow graph.
855
941
// Gets KNI device from which packets will be received.
856
942
// Receive queue will be added to port automatically.
@@ -1418,6 +1504,11 @@ func recvOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
1418
1504
low .ReceiveOS (srp .socket , srp .out [0 ], flag , coreID , & srp .stats )
1419
1505
}
1420
1506
1507
+ func recvXDP (parameters interface {}, inIndex []int32 , flag * int32 , coreID int ) {
1508
+ srp := parameters .(* receiveXDPParameters )
1509
+ low .ReceiveXDP (srp .socket , srp .out [0 ], flag , coreID , & srp .stats )
1510
+ }
1511
+
1421
1512
func processKNI (parameters interface {}, inIndex []int32 , flag * int32 , coreID int ) {
1422
1513
srk := parameters .(* KNIParameters )
1423
1514
if srk .linuxCore == true {
@@ -1607,6 +1698,11 @@ func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
1607
1698
low .SendOS (srp .socket , srp .in , flag , coreID , & srp .stats )
1608
1699
}
1609
1700
1701
+ func sendXDP (parameters interface {}, inIndex []int32 , flag * int32 , coreID int ) {
1702
+ srp := parameters .(* sendXDPParameters )
1703
+ low .SendXDP (srp .socket , srp .in , flag , coreID , & srp .stats )
1704
+ }
1705
+
1610
1706
func merge (from low.Rings , to low.Rings ) {
1611
1707
// We should change out rings in all flow functions which we added before
1612
1708
// and change them to one "after merge" ring.
@@ -1620,6 +1716,10 @@ func merge(from low.Rings, to low.Rings) {
1620
1716
if parameters .out [0 ] == from [0 ] {
1621
1717
parameters .out = to
1622
1718
}
1719
+ case * receiveOSParameters :
1720
+ if parameters .out [0 ] == from [0 ] {
1721
+ parameters .out = to
1722
+ }
1623
1723
case * generateParameters :
1624
1724
if parameters .out [0 ] == from [0 ] {
1625
1725
parameters .out = to
0 commit comments