55package akka .io .dns .internal
66
77import java .net .InetSocketAddress
8-
9- import scala .collection .immutable .Seq
10-
8+ import akka .actor .ActorRef
9+ import akka .actor .PoisonPill
1110import akka .actor .Props
1211import akka .io .Tcp
1312import akka .io .Tcp .{ Connected , PeerClosed , Register }
1413import akka .io .dns .{ RecordClass , RecordType }
1514import akka .io .dns .internal .DnsClient .Answer
15+ import akka .testkit .EventFilter
16+ import akka .testkit .WithLogCapturing
1617import akka .testkit .{ AkkaSpec , ImplicitSender , TestProbe }
1718
18- class TcpDnsClientSpec extends AkkaSpec with ImplicitSender {
19+ class TcpDnsClientSpec extends AkkaSpec (""" akka.loglevel = DEBUG
20+ akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]""" ) with ImplicitSender with WithLogCapturing {
1921 import TcpDnsClient ._
2022
2123 " The async TCP DNS client" should {
@@ -51,6 +53,27 @@ class TcpDnsClientSpec extends AkkaSpec with ImplicitSender {
5153 tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
5254 }
5355
56+ " terminated if the connection terminates unexpectedly" in {
57+ val tcpExtensionProbe = TestProbe ()
58+ val answerProbe = TestProbe ()
59+
60+ val client = system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
61+
62+ client ! exampleRequestMessage
63+
64+ tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
65+ tcpExtensionProbe.lastSender ! Connected (dnsServerAddress, localAddress)
66+ expectMsgType[Register ]
67+ val registered = tcpExtensionProbe.lastSender
68+
69+ expectMsgType[Tcp .Write ]
70+
71+ answerProbe.watch(client)
72+
73+ registered ! PoisonPill
74+ answerProbe.expectTerminated(client)
75+ }
76+
5477 " accept a fragmented TCP response" in {
5578 val tcpExtensionProbe = TestProbe ()
5679 val answerProbe = TestProbe ()
@@ -72,22 +95,80 @@ class TcpDnsClientSpec extends AkkaSpec with ImplicitSender {
7295 answerProbe.expectMsg(Answer (42 , Nil ))
7396 }
7497
75- " accept merged TCP responses" in {
98+ " accept multiple fragmented TCP responses" in {
7699 val tcpExtensionProbe = TestProbe ()
77100 val answerProbe = TestProbe ()
78101
79102 val client = system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
80103
81104 client ! exampleRequestMessage
82- client ! exampleRequestMessage.copy(id = 43 )
83105
84106 tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
85107 tcpExtensionProbe.lastSender ! Connected (dnsServerAddress, localAddress)
86108 expectMsgType[Register ]
87109 val registered = tcpExtensionProbe.lastSender
88110
111+ // pretend write+ack+write happened, so three requests written, now both coming back.
112+ // (we need to make sure buffer is not reordered by sandwitched responses)
89113 expectMsgType[Tcp .Write ]
114+ val fullResponse1 = encodeLength(exampleResponseMessage.write().length) ++ exampleResponseMessage.write()
115+ val exampleResponseMessage2 = exampleResponseMessage.copy(id = 43 )
116+ val fullResponse2 = encodeLength(exampleResponseMessage2.write().length) ++ exampleResponseMessage2.write()
117+ val exampleResponseMessage3 = exampleResponseMessage.copy(id = 44 )
118+ val fullResponse3 = encodeLength(exampleResponseMessage3.write().length) ++ exampleResponseMessage3.write()
119+ registered ! Tcp .Received (fullResponse1.take(8 ))
120+ Thread .sleep(30 ) // give things some time to go wrong
121+ registered ! Tcp .Received (fullResponse1.drop(8 ) ++ fullResponse2.take(8 ))
122+ Thread .sleep(30 )
123+ registered ! Tcp .Received (fullResponse2.drop(8 ) ++ fullResponse3.take(8 ))
124+ Thread .sleep(30 )
125+ registered ! Tcp .Received (fullResponse3.drop(8 ))
126+
127+ answerProbe.expectMsg(Answer (42 , Nil ))
128+ answerProbe.expectMsg(Answer (43 , Nil ))
129+ answerProbe.expectMsg(Answer (44 , Nil ))
130+ }
131+
132+ " respect backpressure from the TCP actor" in {
133+ val tcpExtensionProbe = TestProbe ()
134+
135+ val client = system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, ActorRef .noSender)))
136+
137+ client ! exampleRequestMessage
138+ client ! exampleRequestMessage.copy(id = 43 )
139+
140+ tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
141+ tcpExtensionProbe.lastSender ! Connected (dnsServerAddress, localAddress)
142+ expectMsgType[Register ]
143+ val registered = tcpExtensionProbe.lastSender
144+
145+ val ack = expectMsgType[Tcp .Write ].ack
146+ expectNoMessage()
147+ registered ! ack
148+
90149 expectMsgType[Tcp .Write ]
150+ }
151+
152+ " accept merged TCP responses" in {
153+ val tcpExtensionProbe = TestProbe ()
154+ val answerProbe = TestProbe ()
155+
156+ val client = system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
157+
158+ client ! exampleRequestMessage
159+ client ! exampleRequestMessage.copy(id = 43 )
160+
161+ tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
162+ tcpExtensionProbe.lastSender ! Connected (dnsServerAddress, localAddress)
163+ expectMsgType[Register ]
164+ val registered = tcpExtensionProbe.lastSender
165+
166+ var ack = expectMsgType[Tcp .Write ].ack
167+
168+ registered ! ack
169+
170+ ack = expectMsgType[Tcp .Write ].ack
171+
91172 val fullResponse =
92173 encodeLength(exampleResponseMessage.write().length) ++ exampleResponseMessage.write() ++
93174 encodeLength(exampleResponseMessage.write().length) ++ exampleResponseMessage.copy(id = 43 ).write()
@@ -97,5 +178,71 @@ class TcpDnsClientSpec extends AkkaSpec with ImplicitSender {
97178 answerProbe.expectMsg(Answer (42 , Nil ))
98179 answerProbe.expectMsg(Answer (43 , Nil ))
99180 }
181+
182+ " report its failure to the outer client" in {
183+ val tcpExtensionProbe = TestProbe ()
184+ val answerProbe = TestProbe ()
185+
186+ val failToConnectClient =
187+ system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
188+
189+ failToConnectClient ! exampleRequestMessage
190+
191+ val connect = tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
192+
193+ failToConnectClient ! Tcp .CommandFailed (connect)
194+
195+ answerProbe.expectMsg(DnsClient .TcpDropped )
196+
197+ val closesWithErrorClient =
198+ system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
199+
200+ closesWithErrorClient ! exampleRequestMessage
201+
202+ tcpExtensionProbe.expectMsg(connect)
203+ tcpExtensionProbe.lastSender ! Connected (dnsServerAddress, localAddress)
204+ expectMsgType[Register ]
205+ val registered = tcpExtensionProbe.lastSender
206+
207+ registered ! Tcp .ErrorClosed (" BOOM!" )
208+
209+ answerProbe.expectMsg(DnsClient .TcpDropped )
210+ }
211+
212+ " should drop older requests when TCP connection backpressures" in {
213+ val tcpExtensionProbe = TestProbe ()
214+ val connectionProbe = TestProbe ()
215+
216+ val client = system.actorOf(Props (new TcpDnsClient (tcpExtensionProbe.ref, dnsServerAddress, ActorRef .noSender)))
217+
218+ // initial request
219+ val initialRequest = exampleRequestMessage.copy(id = 1 )
220+ client ! initialRequest
221+
222+ tcpExtensionProbe.expectMsg(Tcp .Connect (dnsServerAddress))
223+ tcpExtensionProbe.lastSender.tell(Connected (dnsServerAddress, localAddress), connectionProbe.ref)
224+ connectionProbe.expectMsgType[Register ]
225+ val registered = connectionProbe.lastSender
226+
227+ var write = connectionProbe.expectMsgType[Tcp .Write ]
228+ write.data.drop(2 ) shouldBe initialRequest.write()
229+
230+ // 1 in flight, 14 more, buffer fits 10, should drop 5 oldest (id 2 - 6)
231+ EventFilter .warning(occurrences = 5 , pattern = " Dropping oldest buffered DNS request" ).intercept {
232+ (2 to 16 ).foreach { i =>
233+ client ! exampleRequestMessage.copy(id = i.toShort)
234+ }
235+ }
236+
237+ // initial write is acked
238+ registered ! write.ack
239+
240+ // the rest of the buffered should be handled
241+ (7 to 16 ).foreach { i =>
242+ write = connectionProbe.expectMsgType[Tcp .Write ]
243+ write.data.drop(2 ) shouldBe exampleRequestMessage.copy(id = i.toShort).write()
244+ registered ! write.ack // next ack
245+ }
246+ }
100247 }
101248}
0 commit comments