@@ -54,11 +54,70 @@ type clientPlaySessionHandler struct {
5454
5555 mu struct {
5656 sync.RWMutex
57- loginPluginMessages deque.Deque [* plugin.Message ]
58- serverBossBars map [uuid.UUID ]struct {}
57+ loginPluginMessages deque.Deque [* plugin.Message ]
58+ loginPluginMessagesBytes int
59+ loginPluginMessagesOverflowed bool
60+ serverBossBars map [uuid.UUID ]struct {}
5961 }
6062}
6163
64+ // Caps on the pre-join plugin-message queue (loginPluginMessages). A client that
65+ // never completes its login/FML handshake phase could otherwise queue plugin
66+ // messages without bound. The count/byte limits mirror Velocity's.
67+ const (
68+ maxQueuedLoginPluginMessages = 1024
69+ maxQueuedLoginPluginMessageBytes = 4 * 1024 * 1024 // 4 MiB
70+ )
71+
72+ // enqueueLoginPluginMessage queues a plugin message received before the player
73+ // has joined a server, enforcing per-connection count and byte caps. It returns
74+ // true if the message was queued, or false if a cap was exceeded — in which case
75+ // the queue is dropped, the player disconnected, and the overflow latched so
76+ // subsequent messages are rejected without further work.
77+ func (c * clientPlaySessionHandler ) enqueueLoginPluginMessage (msg * plugin.Message ) bool {
78+ c .mu .Lock ()
79+ if c .mu .loginPluginMessagesOverflowed {
80+ c .mu .Unlock ()
81+ return false
82+ }
83+ newBytes := c .mu .loginPluginMessagesBytes + len (msg .Data )
84+ newCount := c .mu .loginPluginMessages .Len () + 1
85+ if newBytes > maxQueuedLoginPluginMessageBytes || newCount > maxQueuedLoginPluginMessages {
86+ c .mu .loginPluginMessagesOverflowed = true
87+ c .mu .loginPluginMessages .Clear ()
88+ c .mu .loginPluginMessagesBytes = 0
89+ c .mu .Unlock ()
90+ c .log .Info ("disconnecting player: pre-join plugin message queue exceeded its limits" ,
91+ "messages" , newCount , "bytes" , newBytes )
92+ c .player .Disconnect (& component.Text {
93+ Content : "Too many plugin messages were sent before joining a server" ,
94+ })
95+ return false
96+ }
97+ c .mu .loginPluginMessages .PushBack (msg )
98+ c .mu .loginPluginMessagesBytes = newBytes
99+ c .mu .Unlock ()
100+ return true
101+ }
102+
103+ // drainQueuedLoginPluginMessages removes and returns all queued pre-join plugin
104+ // messages, resetting the byte counter so the queue caps start fresh. Returns
105+ // nil if the queue is empty.
106+ func (c * clientPlaySessionHandler ) drainQueuedLoginPluginMessages () []* plugin.Message {
107+ c .mu .Lock ()
108+ defer c .mu .Unlock ()
109+ c .mu .loginPluginMessagesBytes = 0
110+ n := c .mu .loginPluginMessages .Len ()
111+ if n == 0 {
112+ return nil
113+ }
114+ msgs := make ([]* plugin.Message , 0 , n )
115+ for c .mu .loginPluginMessages .Len () != 0 {
116+ msgs = append (msgs , c .mu .loginPluginMessages .PopFront ())
117+ }
118+ return msgs
119+ }
120+
62121func newClientPlaySessionHandler (player * connectedPlayer ) * clientPlaySessionHandler {
63122 log := player .log .WithName ("clientPlaySession" )
64123 h := & clientPlaySessionHandler {
@@ -128,6 +187,8 @@ func (c *clientPlaySessionHandler) Deactivated() {
128187 c .mu .Lock ()
129188 c .player .discardChatQueue ()
130189 c .mu .loginPluginMessages .Clear ()
190+ c .mu .loginPluginMessagesBytes = 0
191+ c .mu .loginPluginMessagesOverflowed = false
131192 c .mu .Unlock ()
132193}
133194
@@ -177,10 +238,7 @@ func (c *clientPlaySessionHandler) FlushQueuedPluginMessages() {
177238 if ! ok {
178239 return
179240 }
180- c .mu .Lock ()
181- defer c .mu .Unlock ()
182- for c .mu .loginPluginMessages .Len () != 0 {
183- pm := c .mu .loginPluginMessages .PopFront ()
241+ for _ , pm := range c .drainQueuedLoginPluginMessages () {
184242 _ = serverMc .BufferPacket (pm )
185243 }
186244 _ = serverMc .Flush ()
@@ -232,16 +290,30 @@ func forwardKeepAlive(p *packet.KeepAlive, player *connectedPlayer) {
232290}
233291
234292func sendKeepAliveToBackend (serverConn * serverConnection , player * connectedPlayer , p * packet.KeepAlive ) bool {
235- if serverConn != nil {
236- if sentTime , ok := serverConn .pendingPings .Get (p .RandomID ); ok {
237- serverConn .pendingPings .Delete (p .RandomID )
238- if serverMc := serverConn .conn (); serverMc != nil {
239- player .ping .Store (time .Since (sentTime ))
240- return serverMc .WritePacket (p ) == nil
241- }
242- }
293+ if serverConn == nil {
294+ return false
243295 }
244- return false
296+ sentTime , ok := serverConn .pendingPings .Get (p .RandomID )
297+ if ! ok {
298+ return false
299+ }
300+ // We removed this pending ping, so it is ours: consume it regardless of
301+ // whether it can be forwarded, so it is not re-dispatched to another
302+ // connection.
303+ serverConn .pendingPings .Delete (p .RandomID )
304+
305+ // Only forward to the backend when it is open and in the same protocol state
306+ // as the client. During a server switch the backend may be in CONFIG while
307+ // the client is still in PLAY; forwarding then would mis-encode the packet.
308+ serverMc := serverConn .conn ()
309+ clientState := player .State ()
310+ if serverMc != nil && ! netmc .Closed (serverMc ) &&
311+ clientState == serverMc .State () &&
312+ (clientState == state .Config || clientState == state .Play ) {
313+ player .ping .Store (time .Since (sentTime ))
314+ _ = serverMc .WritePacket (p )
315+ }
316+ return true
245317}
246318
247319func (c * clientPlaySessionHandler ) handlePluginMessage (packet * plugin.Message ) {
@@ -334,10 +406,9 @@ func (c *clientPlaySessionHandler) handlePluginMessage(packet *plugin.Message) {
334406 // JoinGame packet has been received by the proxy, whichever comes first.
335407 //
336408 // We also need to make sure to retain these packets, so they can be flushed
337- // appropriately.
338- c .mu .Lock ()
339- c .mu .loginPluginMessages .PushBack (packet )
340- c .mu .Unlock ()
409+ // appropriately. The queue is bounded to prevent a client that never
410+ // completes its handshake phase from growing it without limit.
411+ c .enqueueLoginPluginMessage (packet )
341412 }
342413}
343414
@@ -483,8 +554,7 @@ func (c *clientPlaySessionHandler) handleBackendJoinGame(pc *proto.PacketContext
483554 }
484555
485556 // If we had plugin messages queued during login/FML handshake, send them now.
486- for c .mu .loginPluginMessages .Len () != 0 {
487- pm := c .mu .loginPluginMessages .PopFront ()
557+ for _ , pm := range c .drainQueuedLoginPluginMessages () {
488558 if err = serverMc .BufferPacket (pm ); err != nil {
489559 return fmt .Errorf ("error buffering %T for backend: %w" , pm , err )
490560 }
0 commit comments