4444import java .util .Set ;
4545import java .util .concurrent .Callable ;
4646import java .util .concurrent .atomic .AtomicReference ;
47- import java .util .function .BiConsumer ;
4847import java .util .function .Consumer ;
4948import java .util .function .Function ;
5049import java .util .function .Predicate ;
@@ -559,8 +558,9 @@ private void handleAppendFailure(
559558 String shortTableId ,
560559 AppendClientInfo appendClientInfo ,
561560 Callable <Boolean > tryCreateTable ,
562- BiConsumer <Iterable <AppendRowsContext <DestinationT >>, Boolean > initializeContexts ,
563- Consumer <Iterable <AppendRowsContext <DestinationT >>> clearClients ,
561+ Consumer <Iterable <AppendRowsContext <DestinationT >>> initializeContexts ,
562+ Runnable resetClient ,
563+ ValueState <String > streamName ,
564564 ValueState <Long > streamOffset ,
565565 MultiOutputReceiver o ) {
566566 // The first context is always the one that fails.
@@ -659,15 +659,6 @@ private void handleAppendFailure(
659659 throw new RuntimeException (e );
660660 }
661661
662- if (!quotaError ) {
663- // For known errors (offset mismatch, not found) we must reestablish
664- // the streams.
665- // However we've seen that doing this fixes random stuckness issues by reestablishing
666- // gRPC connections,
667- // so we close the clients for all non-quota errors.
668-
669- clearClients .accept (failedContexts );
670- }
671662 appendFailures .inc ();
672663 int retriedRows = failedContext .protoRows .getSerializedRowsCount ();
673664 BigQuerySinkMetrics .appendRowsRowStatusCounter (
@@ -722,11 +713,20 @@ private void handleAppendFailure(
722713 // Finalize the stream and clear streamName so a new stream will be created.
723714 o .get (flushTag )
724715 .output (KV .of (failedContext .streamName , new Operation (failedContext .offset - 1 , true )));
716+
717+ // Clear streamName so a new stream will be created in resetClient below.
718+ streamName .write ("" );
719+
720+ // Re-establish the client with the new stream.
721+ resetClient .run ();
722+
725723 // Reinitialize all contexts with the new stream and new offsets.
726- initializeContexts .accept (failedContexts , true );
724+ initializeContexts .accept (failedContexts );
727725
728726 // Offset failures imply that all subsequent parallel appends will also fail.
729727 // Retry them all.
728+ } else if (!quotaError ) {
729+ resetClient .run ();
730730 }
731731 }
732732
@@ -912,13 +912,9 @@ public void process(
912912
913913 // Initialize stream names and offsets for all contexts. This will be called initially, but
914914 // will also be called if we roll over to a new stream on a retry.
915- BiConsumer <Iterable <AppendRowsContext <DestinationT >>, Boolean > initializeContexts =
916- (contexts , isFailure ) -> {
915+ Consumer <Iterable <AppendRowsContext <DestinationT >>> initializeContexts =
916+ (contexts ) -> {
917917 try {
918- if (isFailure ) {
919- // Clear the stream name, forcing a new one to be created.
920- streamName .write ("" );
921- }
922918 String streamNameRead = Preconditions .checkArgumentNotNull (streamName .read ());
923919 long currentOffset = Preconditions .checkArgumentNotNull (streamOffset .read ());
924920 for (AppendRowsContext <DestinationT > context : contexts ) {
@@ -933,8 +929,8 @@ public void process(
933929 }
934930 };
935931
936- Consumer < Iterable < AppendRowsContext < DestinationT >>> clearClients =
937- (contexts ) -> {
932+ Runnable resetClient =
933+ () -> {
938934 try {
939935 appendClientHolder .invalidateAndReset ();
940936 } catch (Exception e ) {
@@ -967,7 +963,8 @@ public void process(
967963 appendClientHolder .get (),
968964 tryCreateTable ,
969965 initializeContexts ,
970- clearClients ,
966+ resetClient ,
967+ streamName ,
971968 streamOffset ,
972969 o );
973970 return RetryType .RETRY_ALL_OPERATIONS ;
@@ -1068,7 +1065,7 @@ public void process(
10681065 Iterable <AppendRowsContext <DestinationT >> contexts = retryManager .getRemainingContexts ();
10691066
10701067 if (numAppends > 0 ) {
1071- initializeContexts .accept (contexts , false );
1068+ initializeContexts .accept (contexts );
10721069 retryManager .run (true );
10731070
10741071 appendSplitDistribution .update (numAppends );
0 commit comments