Skip to content

Commit 2dcd76c

Browse files
zhangenyaozey1996
zhangenyao
authored andcommitted
add timeout handler and max rtt option
you can set MaxRtt and OnTimeout func Ontime func while be call when a request was not answered within a specified time Signed-off-by: zhangenyao <[email protected]>
1 parent 23b417c commit 2dcd76c

File tree

5 files changed

+227
-24
lines changed

5 files changed

+227
-24
lines changed

cmd/ping/ping.go

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Examples:
3838

3939
func main() {
4040
timeout := flag.Duration("t", time.Second*100000, "")
41+
maxRtt := flag.Duration("mr", time.Second*3, "")
4142
interval := flag.Duration("i", time.Second, "")
4243
count := flag.Int("c", -1, "")
4344
size := flag.Int("s", 24, "")
@@ -84,11 +85,15 @@ func main() {
8485
fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
8586
stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
8687
}
88+
pinger.OnTimeOut = func(packet *probing.Packet) {
89+
fmt.Println("timeout", packet.Addr, packet.Rtt, packet.TTL)
90+
}
8791

8892
pinger.Count = *count
8993
pinger.Size = *size
9094
pinger.Interval = *interval
9195
pinger.Timeout = *timeout
96+
pinger.MaxRtt = *maxRtt
9297
pinger.TTL = *ttl
9398
pinger.SetPrivileged(*privileged)
9499

ping.go

+46-20
Original file line numberDiff line numberDiff line change
@@ -92,24 +92,22 @@ var (
9292
func New(addr string) *Pinger {
9393
r := rand.New(rand.NewSource(getSeed()))
9494
firstUUID := uuid.New()
95-
var firstSequence = map[uuid.UUID]map[int]struct{}{}
96-
firstSequence[firstUUID] = make(map[int]struct{})
9795
return &Pinger{
9896
Count: -1,
9997
Interval: time.Second,
10098
RecordRtts: true,
10199
Size: timeSliceLength + trackerLength,
102100
Timeout: time.Duration(math.MaxInt64),
101+
MaxRtt: time.Duration(math.MaxInt64),
103102

104103
addr: addr,
105104
done: make(chan interface{}),
106105
id: r.Intn(math.MaxUint16),
107-
trackerUUIDs: []uuid.UUID{firstUUID},
108106
ipaddr: nil,
109107
ipv4: false,
110108
network: "ip",
111109
protocol: "udp",
112-
awaitingSequences: firstSequence,
110+
awaitingSequences: newSeqMap(firstUUID),
113111
TTL: 64,
114112
logger: StdLogger{Logger: log.New(log.Writer(), log.Prefix(), log.Flags())},
115113
}
@@ -129,6 +127,9 @@ type Pinger struct {
129127
// Timeout specifies a timeout before ping exits, regardless of how many
130128
// packets have been received.
131129
Timeout time.Duration
130+
// MaxRtt If no response is received after this time, OnTimeout is called
131+
// important! This option is not guaranteed. and if we receive the packet that was timeout, the function OnDuplicateRecv will be called
132+
MaxRtt time.Duration
132133

133134
// Count tells pinger to stop after sending (and receiving) Count echo
134135
// packets. If this option is not specified, pinger will operate until
@@ -183,6 +184,8 @@ type Pinger struct {
183184
// OnRecvError is called when an error occurs while Pinger attempts to receive a packet
184185
OnRecvError func(error)
185186

187+
// OnTimeOut is called when a packet don't have received after MaxRtt.
188+
OnTimeOut func(*Packet)
186189
// Size of packet being sent
187190
Size int
188191

@@ -205,14 +208,11 @@ type Pinger struct {
205208
// df when true sets the do-not-fragment bit in the outer IP or IPv6 header
206209
df bool
207210

208-
// trackerUUIDs is the list of UUIDs being used for sending packets.
209-
trackerUUIDs []uuid.UUID
210-
211211
ipv4 bool
212212
id int
213213
sequence int
214214
// awaitingSequences are in-flight sequence numbers we keep track of to help remove duplicate receipts
215-
awaitingSequences map[uuid.UUID]map[int]struct{}
215+
awaitingSequences seqMap
216216
// network is one of "ip", "ip4", or "ip6".
217217
network string
218218
// protocol is "icmp" or "udp".
@@ -520,20 +520,48 @@ func (p *Pinger) runLoop(
520520

521521
timeout := time.NewTicker(p.Timeout)
522522
interval := time.NewTicker(p.Interval)
523+
timeoutTimer := time.NewTimer(time.Duration(math.MaxInt64))
524+
skip := false
523525
defer func() {
524526
interval.Stop()
525527
timeout.Stop()
528+
timeoutTimer.Stop()
526529
}()
527530

528531
if err := p.sendICMP(conn); err != nil {
529532
return err
530533
}
531534

532535
for {
536+
if !skip && !timeoutTimer.Stop() {
537+
<-timeoutTimer.C
538+
}
539+
skip = false
540+
first := p.awaitingSequences.peekFirst()
541+
if first != nil {
542+
timeoutTimer.Reset(time.Until(first.time.Add(p.MaxRtt)))
543+
} else {
544+
timeoutTimer.Reset(time.Duration(math.MaxInt64))
545+
}
546+
533547
select {
534548
case <-p.done:
535549
return nil
536550

551+
case <-timeoutTimer.C:
552+
skip = true
553+
p.awaitingSequences.removeElem(first)
554+
if p.OnTimeOut != nil {
555+
inPkt := &Packet{
556+
IPAddr: p.ipaddr,
557+
Addr: p.addr,
558+
Rtt: p.MaxRtt,
559+
Seq: first.seq,
560+
TTL: -1,
561+
ID: p.id,
562+
}
563+
p.OnTimeOut(inPkt)
564+
}
537565
case <-timeout.C:
538566
return nil
539567

@@ -680,18 +708,15 @@ func (p *Pinger) getPacketUUID(pkt []byte) (*uuid.UUID, error) {
680708
if err != nil {
681709
return nil, fmt.Errorf("error decoding tracking UUID: %w", err)
682710
}
683-
684-
for _, item := range p.trackerUUIDs {
685-
if item == packetUUID {
686-
return &packetUUID, nil
687-
}
711+
if p.awaitingSequences.checkUUIDExist(packetUUID) {
712+
return &packetUUID, nil
688713
}
689714
return nil, nil
690715
}
691716

692717
// getCurrentTrackerUUID grabs the latest tracker UUID.
693718
func (p *Pinger) getCurrentTrackerUUID() uuid.UUID {
694-
return p.trackerUUIDs[len(p.trackerUUIDs)-1]
719+
return p.awaitingSequences.getCurUuID()
695720
}
696721

697722
func (p *Pinger) processPacket(recv *packet) error {
@@ -744,15 +769,16 @@ func (p *Pinger) processPacket(recv *packet) error {
744769
inPkt.Rtt = receivedAt.Sub(timestamp)
745770
inPkt.Seq = pkt.Seq
746771
// If we've already received this sequence, ignore it.
747-
if _, inflight := p.awaitingSequences[*pktUUID][pkt.Seq]; !inflight {
772+
e, inflight := p.awaitingSequences.getElem(*pktUUID, pkt.Seq)
773+
if !inflight {
748774
p.PacketsRecvDuplicates++
749775
if p.OnDuplicateRecv != nil {
750776
p.OnDuplicateRecv(inPkt)
751777
}
752778
return nil
753779
}
754780
// remove it from the list of sequences we're waiting for so we don't get duplicates.
755-
delete(p.awaitingSequences[*pktUUID], pkt.Seq)
781+
p.awaitingSequences.removeElem(e)
756782
p.updateStatistics(inPkt)
757783
default:
758784
// Very bad, not sure how this can happen
@@ -777,7 +803,8 @@ func (p *Pinger) sendICMP(conn packetConn) error {
777803
if err != nil {
778804
return fmt.Errorf("unable to marshal UUID binary: %w", err)
779805
}
780-
t := append(timeToBytes(time.Now()), uuidEncoded...)
806+
now := time.Now()
807+
t := append(timeToBytes(now), uuidEncoded...)
781808
if remainSize := p.Size - timeSliceLength - trackerLength; remainSize > 0 {
782809
t = append(t, bytes.Repeat([]byte{1}, remainSize)...)
783810
}
@@ -829,13 +856,12 @@ func (p *Pinger) sendICMP(conn packetConn) error {
829856
p.OnSend(outPkt)
830857
}
831858
// mark this sequence as in-flight
832-
p.awaitingSequences[currentUUID][p.sequence] = struct{}{}
859+
p.awaitingSequences.putElem(currentUUID, p.sequence, now)
833860
p.PacketsSent++
834861
p.sequence++
835862
if p.sequence > 65535 {
836863
newUUID := uuid.New()
837-
p.trackerUUIDs = append(p.trackerUUIDs, newUUID)
838-
p.awaitingSequences[newUUID] = make(map[int]struct{})
864+
p.awaitingSequences.newSeqMap(newUUID)
839865
p.sequence = 0
840866
}
841867
break

ping_test.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ func TestProcessPacket(t *testing.T) {
2929
if err != nil {
3030
t.Fatalf("unable to marshal UUID binary: %s", err)
3131
}
32-
data := append(timeToBytes(time.Now()), uuidEncoded...)
32+
now := time.Now()
33+
data := append(timeToBytes(now), uuidEncoded...)
3334
if remainSize := pinger.Size - timeSliceLength - trackerLength; remainSize > 0 {
3435
data = append(data, bytes.Repeat([]byte{1}, remainSize)...)
3536
}
@@ -39,7 +40,8 @@ func TestProcessPacket(t *testing.T) {
3940
Seq: pinger.sequence,
4041
Data: data,
4142
}
42-
pinger.awaitingSequences[currentUUID][pinger.sequence] = struct{}{}
43+
pinger.awaitingSequences.putElem(currentUUID, pinger.sequence, now)
44+
//pinger.awaitingSequences[currentUUID][pinger.sequence] = struct{}{}
4345

4446
msg := &icmp.Message{
4547
Type: ipv4.ICMPTypeEchoReply,
@@ -598,7 +600,8 @@ func TestProcessPacket_IgnoresDuplicateSequence(t *testing.T) {
598600
if err != nil {
599601
t.Fatalf("unable to marshal UUID binary: %s", err)
600602
}
601-
data := append(timeToBytes(time.Now()), uuidEncoded...)
603+
now := time.Now()
604+
data := append(timeToBytes(now), uuidEncoded...)
602605
if remainSize := pinger.Size - timeSliceLength - trackerLength; remainSize > 0 {
603606
data = append(data, bytes.Repeat([]byte{1}, remainSize)...)
604607
}
@@ -609,7 +612,9 @@ func TestProcessPacket_IgnoresDuplicateSequence(t *testing.T) {
609612
Data: data,
610613
}
611614
// register the sequence as sent
612-
pinger.awaitingSequences[currentUUID][0] = struct{}{}
615+
616+
pinger.awaitingSequences.putElem(currentUUID, 0, now)
617+
//pinger.awaitingSequences[currentUUID][0] = struct{}{}
613618

614619
msg := &icmp.Message{
615620
Type: ipv4.ICMPTypeEchoReply,

seq_map.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package probing
2+
3+
import (
4+
"github.com/google/uuid"
5+
"time"
6+
)
7+
8+
type seqMap struct {
9+
curUUID uuid.UUID
10+
head *elem
11+
tail *elem
12+
uuidMap map[uuid.UUID]map[int]*elem
13+
}
14+
type elem struct {
15+
uuid uuid.UUID
16+
seq int
17+
time time.Time
18+
prev *elem
19+
next *elem
20+
}
21+
22+
func newSeqMap(u uuid.UUID) seqMap {
23+
s := seqMap{
24+
curUUID: u,
25+
head: &elem{},
26+
tail: &elem{},
27+
uuidMap: map[uuid.UUID]map[int]*elem{},
28+
}
29+
s.uuidMap[u] = make(map[int]*elem)
30+
s.head.next = s.tail
31+
s.tail.prev = s.head
32+
return s
33+
}
34+
35+
func (s seqMap) newSeqMap(u uuid.UUID) {
36+
s.curUUID = u
37+
s.uuidMap[u] = make(map[int]*elem)
38+
}
39+
40+
func (s seqMap) putElem(uuid uuid.UUID, seq int, now time.Time) {
41+
e := &elem{
42+
uuid: uuid,
43+
seq: seq,
44+
time: now,
45+
prev: s.tail.prev,
46+
next: s.tail,
47+
}
48+
s.tail.prev.next = e
49+
s.tail.prev = e
50+
s.uuidMap[uuid][seq] = e
51+
}
52+
func (s seqMap) getElem(uuid uuid.UUID, seq int) (*elem, bool) {
53+
e, ok := s.uuidMap[uuid][seq]
54+
return e, ok
55+
}
56+
func (s seqMap) removeElem(e *elem) {
57+
e.prev.next = e.next
58+
e.next.prev = e.prev
59+
if m, ok := s.uuidMap[e.uuid]; ok {
60+
delete(m, e.seq)
61+
}
62+
}
63+
func (s seqMap) peekFirst() *elem {
64+
if s.head.next == s.tail {
65+
return nil
66+
}
67+
return s.head.next
68+
}
69+
func (s seqMap) getCurUuID() uuid.UUID {
70+
return s.curUUID
71+
}
72+
73+
func (s seqMap) checkUUIDExist(u uuid.UUID) bool {
74+
_, ok := s.uuidMap[u]
75+
return ok
76+
}

0 commit comments

Comments
 (0)