2323use raklib \protocol \PacketReliability ;
2424use raklib \protocol \SplitPacketInfo ;
2525use function array_fill ;
26+ use function array_push ;
27+ use function assert ;
2628use function count ;
29+ use function microtime ;
2730use function str_split ;
2831use function strlen ;
29- use function time ;
3032
3133final class SendReliabilityLayer{
3234 private const DATAGRAM_MTU_OVERHEAD = 36 + Datagram::HEADER_SIZE ; //IP header (20 bytes) + UDP header (8 bytes) + RakNet weird (8 bytes) = 36
3335 private const MIN_POSSIBLE_PACKET_SIZE_LIMIT = Session::MIN_MTU_SIZE - self ::DATAGRAM_MTU_OVERHEAD ;
36+ /**
37+ * Delay in seconds before an unacked packet is retransmitted.
38+ * TODO: Replace this with dynamic calculation based on roundtrip times (that's a complex task for another time)
39+ */
40+ private const UNACKED_RETRANSMIT_DELAY = 2.0 ;
3441
3542 /** @var EncapsulatedPacket[] */
3643 private array $ sendQueue = [];
@@ -41,12 +48,23 @@ final class SendReliabilityLayer{
4148
4249 private int $ messageIndex = 0 ;
4350
51+ private int $ reliableWindowStart ;
52+ private int $ reliableWindowEnd ;
53+ /**
54+ * @var bool[] message index => acked
55+ * @phpstan-var array<int, bool>
56+ */
57+ private array $ reliableWindow = [];
58+
4459 /** @var int[] */
4560 private array $ sendOrderedIndex ;
4661 /** @var int[] */
4762 private array $ sendSequencedIndex ;
4863
49- /** @var ReliableCacheEntry[] */
64+ /** @var EncapsulatedPacket[] */
65+ private array $ reliableBacklog = [];
66+
67+ /** @var EncapsulatedPacket[] */
5068 private array $ resendQueue = [];
5169
5270 /** @var ReliableCacheEntry[] */
@@ -60,18 +78,22 @@ final class SendReliabilityLayer{
6078
6179 /**
6280 * @phpstan-param int<Session::MIN_MTU_SIZE, max> $mtuSize
63- * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
64- * @phpstan-param \Closure(int) : void $onACK
81+ * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
82+ * @phpstan-param \Closure(int) : void $onACK
6583 */
6684 public function __construct (
6785 private int $ mtuSize ,
6886 private \Closure $ sendDatagramCallback ,
69- private \Closure $ onACK
87+ private \Closure $ onACK ,
88+ private int $ reliableWindowSize = 512 ,
7089 ){
7190 $ this ->sendOrderedIndex = array_fill (0 , PacketReliability::MAX_ORDER_CHANNELS , 0 );
7291 $ this ->sendSequencedIndex = array_fill (0 , PacketReliability::MAX_ORDER_CHANNELS , 0 );
7392
7493 $ this ->maxDatagramPayloadSize = $ this ->mtuSize - self ::DATAGRAM_MTU_OVERHEAD ;
94+
95+ $ this ->reliableWindowStart = 0 ;
96+ $ this ->reliableWindowEnd = $ this ->reliableWindowSize ;
7597 }
7698
7799 /**
@@ -102,6 +124,19 @@ public function sendQueue() : void{
102124 }
103125
104126 private function addToQueue (EncapsulatedPacket $ pk , bool $ immediate ) : void {
127+ if (PacketReliability::isReliable ($ pk ->reliability )){
128+ if ($ pk ->messageIndex === null || $ pk ->messageIndex < $ this ->reliableWindowStart ){
129+ throw new \InvalidArgumentException ("Cannot send a reliable packet with message index less than the window start ( $ pk ->messageIndex < $ this ->reliableWindowStart ) " );
130+ }
131+ if ($ pk ->messageIndex >= $ this ->reliableWindowEnd ){
132+ //If we send this now, the client's reliable window may overflow, causing the packet to need redelivery
133+ $ this ->reliableBacklog [$ pk ->messageIndex ] = $ pk ;
134+ return ;
135+ }
136+
137+ $ this ->reliableWindow [$ pk ->messageIndex ] = false ;
138+ }
139+
105140 if ($ pk ->identifierACK !== null and $ pk ->messageIndex !== null ){
106141 $ this ->needACK [$ pk ->identifierACK ][$ pk ->messageIndex ] = $ pk ->messageIndex ;
107142 }
@@ -152,6 +187,7 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia
152187 $ pk ->splitInfo = new SplitPacketInfo ($ splitID , $ count , $ bufferCount );
153188 $ pk ->reliability = $ packet ->reliability ;
154189 $ pk ->buffer = $ buffer ;
190+ $ pk ->identifierACK = $ packet ->identifierACK ;
155191
156192 if ($ pk ->reliability ->isReliable ()){
157193 $ pk ->messageIndex = $ this ->messageIndex ++;
@@ -171,11 +207,26 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia
171207 }
172208 }
173209
210+ private function updateReliableWindow () : void {
211+ while (
212+ isset ($ this ->reliableWindow [$ this ->reliableWindowStart ]) && //this messageIndex has been used
213+ $ this ->reliableWindow [$ this ->reliableWindowStart ] === true //we received an ack for this messageIndex
214+ ){
215+ unset($ this ->reliableWindow [$ this ->reliableWindowStart ]);
216+ $ this ->reliableWindowStart ++;
217+ $ this ->reliableWindowEnd ++;
218+ }
219+ }
220+
174221 public function onACK (ACK $ packet ) : void {
175222 foreach ($ packet ->packets as $ seq ){
176223 if (isset ($ this ->reliableCache [$ seq ])){
177224 foreach ($ this ->reliableCache [$ seq ]->getPackets () as $ pk ){
178- if ($ pk ->identifierACK !== null and $ pk ->messageIndex !== null ){
225+ assert ($ pk ->messageIndex !== null && $ pk ->messageIndex >= $ this ->reliableWindowStart && $ pk ->messageIndex < $ this ->reliableWindowEnd );
226+ $ this ->reliableWindow [$ pk ->messageIndex ] = true ;
227+ $ this ->updateReliableWindow ();
228+
229+ if ($ pk ->identifierACK !== null ){
179230 unset($ this ->needACK [$ pk ->identifierACK ][$ pk ->messageIndex ]);
180231 if (count ($ this ->needACK [$ pk ->identifierACK ]) === 0 ){
181232 unset($ this ->needACK [$ pk ->identifierACK ]);
@@ -191,8 +242,9 @@ public function onACK(ACK $packet) : void{
191242 public function onNACK (NACK $ packet ) : void {
192243 foreach ($ packet ->packets as $ seq ){
193244 if (isset ($ this ->reliableCache [$ seq ])){
194- //TODO: group resends if the resulting datagram is below the MTU
195- $ this ->resendQueue [] = $ this ->reliableCache [$ seq ];
245+ foreach ($ this ->reliableCache [$ seq ]->getPackets () as $ pk ){
246+ $ this ->resendQueue [] = $ pk ;
247+ }
196248 unset($ this ->reliableCache [$ seq ]);
197249 }
198250 }
@@ -201,34 +253,42 @@ public function onNACK(NACK $packet) : void{
201253 public function needsUpdate () : bool {
202254 return (
203255 count ($ this ->sendQueue ) !== 0 or
256+ count ($ this ->reliableBacklog ) !== 0 or
204257 count ($ this ->resendQueue ) !== 0 or
205258 count ($ this ->reliableCache ) !== 0
206259 );
207260 }
208261
209262 public function update () : void {
210- if (count ($ this ->resendQueue ) > 0 ){
211- $ limit = 16 ;
212- foreach ($ this ->resendQueue as $ k => $ pk ){
213- $ this ->sendDatagram ($ pk ->getPackets ());
214- unset($ this ->resendQueue [$ k ]);
215-
216- if (--$ limit <= 0 ){
217- break ;
218- }
263+ $ retransmitOlderThan = microtime (true ) - self ::UNACKED_RETRANSMIT_DELAY ;
264+ foreach ($ this ->reliableCache as $ seq => $ pk ){
265+ if ($ pk ->getTimestamp () < $ retransmitOlderThan ){
266+ //behave as if a NACK was received
267+ array_push ($ this ->resendQueue , ...$ pk ->getPackets ());
268+ unset($ this ->reliableCache [$ seq ]);
269+ }else {
270+ break ;
219271 }
272+ }
220273
221- if (count ($ this ->resendQueue ) > ReceiveReliabilityLayer::$ WINDOW_SIZE ){
222- $ this ->resendQueue = [];
274+ if (count ($ this ->resendQueue ) > 0 ){
275+ foreach ($ this ->resendQueue as $ pk ){
276+ //resends should always be within the reliable window
277+ $ this ->addToQueue ($ pk , false );
223278 }
279+ $ this ->resendQueue = [];
224280 }
225281
226- foreach ($ this ->reliableCache as $ seq => $ pk ){
227- if ($ pk ->getTimestamp () < (time () - 8 )){
228- $ this ->resendQueue [] = $ pk ;
229- unset($ this ->reliableCache [$ seq ]);
230- }else {
231- break ;
282+ if (count ($ this ->reliableBacklog ) > 0 ){
283+ foreach ($ this ->reliableBacklog as $ k => $ pk ){
284+ assert ($ pk ->messageIndex !== null && $ pk ->messageIndex >= $ this ->reliableWindowStart );
285+ if ($ pk ->messageIndex >= $ this ->reliableWindowEnd ){
286+ //we can't send this packet yet, the client's reliable window will drop it
287+ break ;
288+ }
289+
290+ $ this ->addToQueue ($ pk , false );
291+ unset($ this ->reliableBacklog [$ k ]);
232292 }
233293 }
234294
0 commit comments