@@ -289,106 +289,3 @@ def factory(address, timeout_ms=10_000, interceptors=()):
289289
290290 monkeypatch .setattr (client_mod , "LogServiceClientSync" , factory )
291291 return created
292-
293-
294- def _entry (data = "line" ):
295- e = logging_pb2 .LogEntry (source = "test" , data = data , level = logging_pb2 .LOG_LEVEL_INFO )
296- e .timestamp .epoch_ms = 1
297- return e
298-
299-
300- @pytest .mark .parametrize (
301- "scenario" ,
302- [
303- # Retryable RPC failure, resolver points elsewhere on the retry —
304- # the common log-server failover case.
305- "retryable_with_resolver_failover" ,
306- # Retryable RPC failure against a fixed URL — rebuilds the RPC
307- # client to heal a stuck TCP connection.
308- "retryable_static_url" ,
309- # Resolver itself raises on first call — rebuffers, retries, and
310- # the next resolver call succeeds.
311- "resolver_raises" ,
312- # Non-retryable RPC error — rebuffers (no drops) but does NOT
313- # invalidate the cached client.
314- "non_retryable" ,
315- ],
316- )
317- def test_failures_always_deliver_via_retry (monkeypatch , scenario ):
318- """No send-failure path drops entries; the retry loop eventually delivers.
319-
320- Asserts the delivery guarantee across all failure kinds. Per-scenario
321- extras check the side-effects (invalidate vs keep client, resolver
322- re-invoked or not).
323- """
324- created : list [_FakeLogServiceClient ] = []
325-
326- def factory (address , timeout_ms = 10_000 , interceptors = ()):
327- c = _FakeLogServiceClient (address , timeout_ms = timeout_ms , interceptors = interceptors )
328- created .append (c )
329- return c
330-
331- monkeypatch .setattr (client_mod , "LogServiceClientSync" , factory )
332-
333- pusher : LogPusher
334- if scenario == "retryable_with_resolver_failover" :
335- addrs = iter (["http://a:1" , "http://b:2" ])
336- pusher = LogPusher ("iris://x" , batch_size = 1 , flush_interval = 0.1 , resolver = lambda _url : next (addrs ))
337- elif scenario == "retryable_static_url" :
338- pusher = LogPusher ("http://h:1" , batch_size = 1 , flush_interval = 0.1 )
339- elif scenario == "resolver_raises" :
340- attempts : list [int ] = []
341-
342- def resolver (_url ):
343- attempts .append (1 )
344- if len (attempts ) == 1 :
345- raise ConnectionError ("controller down" )
346- return "http://good:1"
347-
348- pusher = LogPusher ("iris://x" , batch_size = 1 , flush_interval = 0.1 , resolver = resolver )
349- elif scenario == "non_retryable" :
350- pusher = LogPusher ("iris://x" , batch_size = 1 , flush_interval = 0.1 , resolver = lambda _url : "http://a:1" )
351- else :
352- raise AssertionError (scenario )
353-
354- try :
355- # First push — forces the drain thread to produce a client (except
356- # for resolver_raises, which has no client to seed).
357- pusher .push ("k" , [_entry ("a" )])
358- if scenario != "resolver_raises" :
359- # Block until "a" has shipped, so seeding the next error is
360- # race-free with the drain thread's next iteration.
361- assert pusher .flush (timeout = 5.0 )
362- err = (
363- ConnectError (Code .NOT_FOUND , "missing" )
364- if scenario == "non_retryable"
365- else ConnectError (Code .UNAVAILABLE , "gone" )
366- )
367- created [0 ].errors .append (err )
368-
369- pusher .push ("k" , [_entry ("b" )])
370-
371- # Wait deterministically for "b" to be processed (sent or dropped).
372- assert pusher .flush (timeout = 10.0 )
373-
374- # "b" must have landed somewhere — the buffer-overflow path is not
375- # exercised here, so processed implies delivered.
376- def delivered ():
377- return any (any (e .data == "b" for p in c .pushes for e in p .entries ) for c in created )
378-
379- assert delivered (), "entry 'b' was never delivered to any client"
380-
381- if scenario .startswith ("retryable" ):
382- # Retryable RPC failure invalidated the first client; second built.
383- assert len (created ) >= 2
384- assert created [0 ].closed is True
385- elif scenario == "resolver_raises" :
386- # Resolver raised on first call → no client yet. Second call
387- # succeeded → exactly one client created.
388- assert len (created ) == 1
389- elif scenario == "non_retryable" :
390- # Same client retries; no invalidate, no rebuild.
391- assert len (created ) == 1
392- assert created [0 ].closed is False
393- finally :
394- pusher .close ()
0 commit comments