-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdefragmenter.go
152 lines (123 loc) · 4.93 KB
/
defragmenter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package protean
import (
"encoding/hex"
"fmt"
"time"
)
// Cache expiration is set to 60 seconds.
const CACHE_EXPIRATION_TIME time.Duration = time.Duration(60 * 1000)
// Tracks the fragments for a single packet identifier
type PacketTracker struct {
// Indexed lists of fragments for this packet
Pieces [][]byte
// Counts of the number remaining
// This is an optimization to avoid scanning Pieces repeatedly for counts.
Counter uint8
// Stores the Timer objects for expiring each identifier
// See RFC 815, section 7, paragraph 2 (p. 8)
Timer *time.Timer
}
// The Defragmenter gathers fragmented packets in a buffer and defragments them.
// The cache expiration strategy is taken from RFC 815: IP Datagram Reassembly
// Algorithms.
type Defragmenter struct {
// Associates packet identifiers with indexed lists of fragments
// The packet identifiers are converted from []bytes to hex strings so
// that they can be used as map keys.
tracker map[string]PacketTracker
// Stores the packet identifiers for which we have all fragments
complete [][][]byte
}
// Add a fragment that has been received from the network.
// Fragments are processed according to the following logic:
// If the packet identifier is recognized:
// If we have a fragment for this index:
// This fragment is a duplicate, drop it.
// Else:
// This fragment a new fragment for an existing packet
// Else:
// This fragment a new fragment for a new packet.
func (this *Defragmenter) AddFragment(fragment *Fragment) {
// Convert []byte to hex string so that it can be used as a map key
hexid := hex.EncodeToString(fragment.Id)
if tracked, ok := this.tracker[hexid]; ok {
// A fragment for an existing packet
// Get list of fragment contents for this packet identifier
fragmentList := tracked.Pieces
if fragmentList[fragment.Index] != nil {
// Duplicate fragment
// The fragmentation system does not retransmit dropped packets.
// Therefore, a duplicate is an error.
// However, it might be a recoverable error.
// So let's log it and continue.
fmt.Println("Duplicate fragment %1: %2 / %3", hexid, fragment.Index, fragment.Count)
} else {
// New fragment for an existing packet
// Only the payload is stored explicitly.
// The other information is stored implicitly in the data structure.
fragmentList[fragment.Index] = fragment.Payload
tracked.Pieces = fragmentList
// Decrement the Counter for this packet identifier
tracked.Counter = tracked.Counter - 1
this.tracker[hexid] = tracked
// If we have all fragments for this packet identifier, it is complete.
if this.tracker[hexid].Counter == 0 {
// Extract the completed packet fragments from the tracker
this.complete = append(this.complete, this.tracker[hexid].Pieces)
// Stop the Timer now that the packet is complete
tracked.Timer.Stop()
// Delete the completed packet from the tracker
delete(this.tracker, hexid)
}
}
} else {
// A new fragment for a new packet
// Make an empty list of fragments.
fragmentList := make([][]byte, fragment.Count)
// Store this fragment in the fragment list.
fragmentList[fragment.Index] = fragment.Payload
// Set the Counter to the total number of fragments expected.
// The decrement it as we have already received one fragment.
var counter = fragment.Count - 1
if counter == 0 {
// Deal with the case where there is only one fragment for this packet.
this.complete = append(this.complete, fragmentList)
} else {
// Store time the first fragment arrived, to set the cache expiration.
// See RFC 815, section 7, paragraph 2 (p. 8)
// Cache expiration is set to 60 seconds.
var timer = time.AfterFunc(CACHE_EXPIRATION_TIME, func() { this.reap(hexid) })
// Store the fragment information in the tracker
this.tracker[hexid] = PacketTracker{Pieces: fragmentList, Counter: counter, Timer: timer}
}
}
}
// Returns the number of packets for which all fragments have arrived.
func (this *Defragmenter) CompleteCount() int {
return len(this.complete)
}
// Return an []byte for each packet where all fragments are available.
// Calling this clears the set of stored completed fragments.
func (this *Defragmenter) GetComplete() [][]byte {
var packets [][]byte
for i := 0; i < len(this.complete); i++ {
// Obtain the contents from the fragments for a completed packet
// Get the last elemnet of the list
fragmentList := this.complete[len(this.complete)-1]
// Remove the last element of the list
this.complete = this.complete[:len(this.complete)-1]
// Assemble the fragment contents into one []byte per packet
if fragmentList != nil && len(fragmentList) > 0 {
var packet []byte
for _, fragment := range fragmentList {
packet = append(packet, fragment...)
}
packets = append(packets, packet)
}
}
return packets
}
func (this *Defragmenter) reap(hexid string) {
// Remove the fragments from the cache now that the packet has expired
delete(this.tracker, hexid)
}