@@ -73,7 +73,7 @@ module AMQProxy
7373 when AMQ ::Protocol ::Frame ::Heartbeat then send frame
7474 when AMQ ::Protocol ::Frame ::Connection ::Close
7575 Log .error { " Upstream closed connection: #{ frame.reply_text } #{ frame.reply_code } " }
76- close_all_client_channels (frame.reply_code, frame.reply_text)
76+ close_all_downstream_client_connections (frame.reply_code, frame.reply_text)
7777 begin
7878 send AMQ ::Protocol ::Frame ::Connection ::CloseOk .new
7979 rescue WriteError
@@ -106,29 +106,27 @@ module AMQProxy
106106 Log .info { " Connection error #{ ex.inspect } " } unless socket.closed?
107107 ensure
108108 socket.close rescue nil
109- close_all_client_channels
109+ close_all_downstream_client_connections
110110 end
111111
112112 def closed ?
113113 @socket .closed?
114114 end
115115
116- private def close_all_client_channels (code = 500 _u16 , reason = " UPSTREAM_ERROR" )
116+ private def close_all_downstream_client_connections (code = 500 _u16 , reason = " UPSTREAM_ERROR" )
117117 clients = Set (Client ).new
118118 @channels_lock .synchronize do
119119 return if @channels .empty?
120- Log .debug { " Upstream connection closed, closing #{ @channels .size} client channels" }
121- @channels .each_value do |downstream_channel |
122- clients << downstream_channel.client
120+ @channels .each_value do |downstream_connection |
121+ clients << downstream_connection.client
122+ end
123+
124+ clients.each do |client |
125+ Log .debug { " Closing client connection due to upstream failure. Client: #{ client } " }
126+ client.close_connection(code, reason)
123127 end
124128 @channels .clear
125129 end
126-
127- # Close all client connections that were using this upstream
128- clients.each do |client |
129- Log .debug { " Closing client connection due to upstream failure" }
130- client.close_connection(code, reason)
131- end
132130 end
133131
134132 private def send_to_all_clients (frame : AMQ ::Protocol ::Frame ::Connection )
0 commit comments