55
66import com .azure .cosmos .BridgeInternal ;
77import com .azure .cosmos .ConsistencyLevel ;
8+ import com .azure .cosmos .implementation .AuthorizationTokenType ;
89import com .azure .cosmos .implementation .ClientSideRequestStatistics ;
910import com .azure .cosmos .implementation .DiagnosticsClientContext ;
1011import com .azure .cosmos .implementation .FailureValidator ;
1112import com .azure .cosmos .implementation .GoneException ;
13+ import com .azure .cosmos .implementation .HttpConstants .SubStatusCodes ;
1214import com .azure .cosmos .implementation .IAuthorizationTokenProvider ;
1315import com .azure .cosmos .implementation .ISessionContainer ;
16+ import com .azure .cosmos .implementation .OperationType ;
1417import com .azure .cosmos .implementation .PartitionIsMigratingException ;
1518import com .azure .cosmos .implementation .PartitionKeyRangeGoneException ;
1619import com .azure .cosmos .implementation .PartitionKeyRangeIsSplittingException ;
1720import com .azure .cosmos .implementation .RequestTimeoutException ;
21+ import com .azure .cosmos .implementation .ResourceType ;
1822import com .azure .cosmos .implementation .RetryContext ;
1923import com .azure .cosmos .implementation .RxDocumentServiceRequest ;
2024import com .azure .cosmos .implementation .SessionTokenHelper ;
3337
3438import java .time .Duration ;
3539import java .util .ArrayList ;
40+ import java .util .Arrays ;
3641import java .util .Collections ;
3742import java .util .HashMap ;
3843import java .util .List ;
3944import java .util .Map ;
45+ import java .util .UUID ;
4046import java .util .concurrent .CountDownLatch ;
4147import java .util .concurrent .CyclicBarrier ;
4248import java .util .concurrent .TimeUnit ;
@@ -213,7 +219,7 @@ public void getLsnAndGlobalCommittedLsn() {
213219 StoreResponse sr = new StoreResponse (null , 0 , headers , null , 0 );
214220 Utils .ValueHolder <Long > lsn = Utils .ValueHolder .initialize (-2l );
215221 Utils .ValueHolder <Long > globalCommittedLsn = Utils .ValueHolder .initialize (-2l );
216- ConsistencyWriter .getLsnAndGlobalCommittedLsn (sr , lsn , globalCommittedLsn );
222+ ConsistencyWriter .getLsnAndGlobalCommittedLsn (sr , lsn , globalCommittedLsn , BarrierType . GLOBAL_STRONG_WRITE );
217223 assertThat (lsn .v ).isEqualTo (3 );
218224 assertThat (globalCommittedLsn .v ).isEqualTo (2 );
219225 }
@@ -358,6 +364,226 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo
358364 assertThat (consistencyWriter .isGlobalStrongRequest (req , storeResponse )).isEqualTo (isGlobalStrongExpected );
359365 }
360366
367+ @ Test (groups = "unit" )
368+ public void writeAsyncGlobalStrongRequest () {
369+ runWriteAsyncBarrierableRequestTest (true , true );
370+ }
371+
372+ @ Test (groups = "unit" )
373+ public void writeAsyncGlobalStrongRequestFailed () {
374+ runWriteAsyncBarrierableRequestTest (true , false );
375+ }
376+
377+ @ Test (groups = "unit" )
378+ public void writeAsyncNRegionCommitRequest () {
379+ runWriteAsyncBarrierableRequestTest (false , true );
380+ }
381+
382+ @ Test (groups = "unit" )
383+ public void writeAsyncNRegionCommitRequestFailed () {
384+ runWriteAsyncBarrierableRequestTest (false , false );
385+ }
386+
387+ @ Test (groups = "unit" )
388+ public void writeAsyncNoBarrierRequest () {
389+ initializeConsistencyWriter (false );
390+ RxDocumentServiceRequest request = mockDocumentServiceRequest (clientContext );
391+ TimeoutHelper timeoutHelper = Mockito .mock (TimeoutHelper .class );
392+ Mockito .doReturn (false ).when (timeoutHelper ).isElapsed ();
393+ StoreResponse storeResponse = Mockito .mock (StoreResponse .class );
394+ Mockito .doReturn ("0" ).when (storeResponse ).getHeaderValue (WFConstants .BackendHeaders .NUMBER_OF_READ_REGIONS );
395+ Mockito .doReturn (ConsistencyLevel .SESSION ).when (serviceConfigReader ).getDefaultConsistencyLevel ();
396+ ConsistencyWriter spyWriter = Mockito .spy (this .consistencyWriter );
397+ Mockito .doReturn (Mono .just (storeResponse )).when (spyWriter ).barrierForWriteRequests (Mockito .any (), Mockito .any (), Mockito .any ());
398+ AddressInformation addressInformation = Mockito .mock (AddressInformation .class );
399+ Uri primaryUri = Mockito .mock (Uri .class );
400+ Mockito .doReturn (true ).when (primaryUri ).isPrimary ();
401+ Mockito .doReturn ("Healthy" ).when (primaryUri ).getHealthStatusDiagnosticString ();
402+ Mockito .doReturn (primaryUri ).when (addressInformation ).getPhysicalUri ();
403+ List <AddressInformation > addressList = Collections .singletonList (addressInformation );
404+ Mockito .doReturn (Mono .just (addressList )).when (addressSelector ).resolveAddressesAsync (Mockito .any (), Mockito .anyBoolean ());
405+ Mockito .doReturn (Mono .just (storeResponse )).when (transportClient ).invokeResourceOperationAsync (Mockito .any (Uri .class ), Mockito .any (RxDocumentServiceRequest .class ));
406+ Mono <StoreResponse > result = spyWriter .writeAsync (request , timeoutHelper , false );
407+ StepVerifier .create (result )
408+ .expectNext (storeResponse )
409+ .expectComplete ()
410+ .verify ();
411+ }
412+
413+ @ Test (groups = "unit" )
414+ public void getBarrierRequestType () {
415+ // Setup ConsistencyWriter with useMultipleWriteLocations false
416+ initializeConsistencyWriter (false );
417+ ConsistencyWriter writer = this .consistencyWriter ;
418+ RxDocumentServiceRequest request = mockDocumentServiceRequest (clientContext );
419+ StoreResponse response = Mockito .mock (StoreResponse .class );
420+
421+ // 1. Global strong enabled and isGlobalStrongRequest returns true
422+ try (MockedStatic <ReplicatedResourceClient > replicatedResourceClientMock = Mockito .mockStatic (ReplicatedResourceClient .class )) {
423+ replicatedResourceClientMock .when (ReplicatedResourceClient ::isGlobalStrongEnabled ).thenReturn (true );
424+ ConsistencyWriter spyWriter = Mockito .spy (writer );
425+ Mockito .doReturn (true ).when (spyWriter ).isGlobalStrongRequest (request , response );
426+ BarrierType barrierType = spyWriter .getBarrierRequestType (request , response );
427+ assertThat (barrierType ).isEqualTo (BarrierType .GLOBAL_STRONG_WRITE );
428+ }
429+
430+ // 2. NRegionSynchronousCommitEnabled path
431+ // Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true
432+ request .requestContext .setNRegionSynchronousCommitEnabled (true );
433+ // useMultipleWriteLocations is already false
434+ Mockito .doReturn ("123" ).when (response ).getHeaderValue (WFConstants .BackendHeaders .GLOBAL_N_REGION_COMMITTED_GLSN );
435+ Mockito .doReturn (2L ).when (response ).getNumberOfReadRegions ();
436+ BarrierType barrierType = writer .getBarrierRequestType (request , response );
437+ assertThat (barrierType ).isEqualTo (BarrierType .N_REGION_SYNCHRONOUS_COMMIT );
438+
439+
440+ // 3. Negative case: NRegionSynchronousCommitEnabled false
441+ request .requestContext .setNRegionSynchronousCommitEnabled (false );
442+ BarrierType negativeResult = writer .getBarrierRequestType (request , response );
443+ assertThat (negativeResult ).isEqualTo (BarrierType .NONE );
444+
445+ // 4. Negative case: useMultipleWriteLocations true
446+ initializeConsistencyWriter (true );
447+ writer = this .consistencyWriter ;
448+ request .requestContext .setNRegionSynchronousCommitEnabled (true );
449+ BarrierType negativeResult2 = writer .getBarrierRequestType (request , response );
450+ assertThat (negativeResult2 ).isEqualTo (BarrierType .NONE );
451+
452+ // 5. Negative case: GLOBAL_NREGION_COMMITTED_LSN header missing
453+ initializeConsistencyWriter (false );
454+ writer = this .consistencyWriter ;
455+ request .requestContext .setNRegionSynchronousCommitEnabled (true );
456+ Mockito .doReturn (null ).when (response ).getHeaderValue (WFConstants .BackendHeaders .GLOBAL_N_REGION_COMMITTED_GLSN );
457+ BarrierType negativeResult3 = writer .getBarrierRequestType (request , response );
458+ assertThat (negativeResult3 ).isEqualTo (BarrierType .NONE );
459+
460+ // 6. Negative case: NUMBER_OF_READ_REGIONS header missing or zero
461+ Mockito .doReturn (0L ).when (response ).getNumberOfReadRegions ();
462+ BarrierType negativeResult4 = writer .getBarrierRequestType (request , response );
463+ assertThat (negativeResult4 ).isEqualTo (BarrierType .NONE );
464+ }
465+
466+ private void runWriteAsyncBarrierableRequestTest (boolean globalStrong , boolean barrierMet ) {
467+ RxDocumentServiceRequest request = setupRequest (!globalStrong );
468+ TimeoutHelper timeoutHelper = Mockito .mock (TimeoutHelper .class );
469+ Mockito .doReturn (false ).when (timeoutHelper ).isElapsed ();
470+ StoreResponse storeResponse = setupStoreResponse (!globalStrong );
471+ List <AddressInformation > addressList = setupAddressList ();
472+ List <StoreResult > storeResults = new ArrayList <>();
473+ if (barrierMet ) {
474+ storeResults .add (getStoreResult (storeResponse , 1L ));
475+ storeResults .add (getStoreResult (storeResponse , 2L ));
476+ } else {
477+ storeResults .add (getStoreResult (storeResponse , 1L ));
478+ }
479+ StoreReader storeReader = setupStoreReader (storeResults );
480+ initializeConsistencyWriterWithStoreReader (false , storeReader );
481+ ConsistencyWriter spyWriter = Mockito .spy (this .consistencyWriter );
482+ Mockito .doReturn (globalStrong ? ConsistencyLevel .STRONG : ConsistencyLevel .SESSION )
483+ .when (serviceConfigReader ).getDefaultConsistencyLevel ();
484+ Mockito .doReturn (Mono .just (addressList )).when (addressSelector ).resolveAddressesAsync (Mockito .any (), Mockito .anyBoolean ());
485+ Mockito .doReturn (Mono .just (storeResponse )).when (transportClient ).invokeResourceOperationAsync (Mockito .any (Uri .class ), Mockito .any (RxDocumentServiceRequest .class ));
486+ Mono <StoreResponse > result = spyWriter .writeAsync (request , timeoutHelper , false );
487+ if (!barrierMet ) {
488+ StepVerifier .create (result )
489+ .expectErrorSatisfies (error -> {
490+ assertThat (error ).isInstanceOf (GoneException .class );
491+ FailureValidator failureValidator = FailureValidator .builder ()
492+ .instanceOf (GoneException .class )
493+ .statusCode (GONE )
494+ .subStatusCode (globalStrong ? SubStatusCodes .GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes .GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET )
495+ .build ();
496+ failureValidator .validate (error );
497+ })
498+ .verify ();
499+ } else {
500+ StepVerifier .create (result )
501+ .expectNext (storeResponse )
502+ .expectComplete ()
503+ .verify ();
504+ }
505+ }
506+
507+ private RxDocumentServiceRequest setupRequest (boolean nRegionCommit ) {
508+ RxDocumentServiceRequest request = mockDocumentServiceRequest (clientContext );
509+ if (nRegionCommit ) {
510+ request .requestContext .setNRegionSynchronousCommitEnabled (true );
511+ }
512+ Mockito .doReturn (ResourceType .Document ).when (request ).getResourceType ();
513+ Mockito .doReturn (OperationType .Create ).when (request ).getOperationType ();
514+ Mockito .doReturn ("1-MxAPlgMgA=" ).when (request ).getResourceId ();
515+ request .authorizationTokenType = AuthorizationTokenType .PrimaryMasterKey ;
516+ return request ;
517+ }
518+
519+ private StoreResponse setupStoreResponse (boolean nRegionCommit ) {
520+ StoreResponse storeResponse = Mockito .mock (StoreResponse .class );
521+ Mockito .doReturn (1L ).when (storeResponse ).getNumberOfReadRegions ();
522+ Mockito .doReturn ("1" ).when (storeResponse ).getHeaderValue (WFConstants .BackendHeaders .NUMBER_OF_READ_REGIONS );
523+ if (nRegionCommit ) {
524+ Mockito .doReturn ("1" ).when (storeResponse ).getHeaderValue (WFConstants .BackendHeaders .GLOBAL_N_REGION_COMMITTED_GLSN );
525+ } else {
526+ Mockito .doReturn ("1" ).when (storeResponse ).getHeaderValue (WFConstants .BackendHeaders .GLOBAL_COMMITTED_LSN );
527+ }
528+ Mockito .doReturn ("2" ).when (storeResponse ).getHeaderValue (WFConstants .BackendHeaders .LSN );
529+ return storeResponse ;
530+ }
531+
532+ private List <AddressInformation > setupAddressList () {
533+ AddressInformation addressInformation = Mockito .mock (AddressInformation .class );
534+ Uri primaryUri = Mockito .mock (Uri .class );
535+ Mockito .doReturn (true ).when (primaryUri ).isPrimary ();
536+ Mockito .doReturn ("Healthy" ).when (primaryUri ).getHealthStatusDiagnosticString ();
537+ Mockito .doReturn (primaryUri ).when (addressInformation ).getPhysicalUri ();
538+ return Collections .singletonList (addressInformation );
539+ }
540+
541+ private StoreReader setupStoreReader (List <StoreResult > storeResults ) {
542+ StoreReader storeReader = Mockito .mock (StoreReader .class );
543+ Mono <List <StoreResult >>[] monos = storeResults .stream ()
544+ .map (Collections ::singletonList )
545+ .map (Mono ::just )
546+ .toArray (Mono []::new );
547+ Mockito .when (storeReader .readMultipleReplicaAsync (
548+ Mockito .any (),
549+ Mockito .anyBoolean (),
550+ Mockito .anyInt (),
551+ Mockito .anyBoolean (),
552+ Mockito .anyBoolean (),
553+ Mockito .any (),
554+ Mockito .anyBoolean (),
555+ Mockito .anyBoolean ()))
556+ .thenReturn (monos .length > 0 ? monos [0 ] : Mono .empty (),
557+ Arrays .copyOfRange (monos , 1 , monos .length ));
558+ return storeReader ;
559+ }
560+
561+ private StoreResult getStoreResult (StoreResponse storeResponse , long globalCommittedLsn ) {
562+ return new StoreResult (
563+ storeResponse ,
564+ null ,
565+ "1" ,
566+ 1 ,
567+ 1 ,
568+ 1.0 ,
569+ UUID .randomUUID ().toString (),
570+ UUID .randomUUID ().toString (),
571+ 4 ,
572+ 2 ,
573+ true ,
574+ null ,
575+ globalCommittedLsn ,
576+ globalCommittedLsn ,
577+ 1 ,
578+ 1 ,
579+ null ,
580+ 0.3 ,
581+ 90.0 );
582+ }
583+
584+
585+
586+
361587 private void initializeConsistencyWriter (boolean useMultipleWriteLocation ) {
362588 addressSelector = Mockito .mock (AddressSelector .class );
363589 sessionContainer = Mockito .mock (ISessionContainer .class );
@@ -375,6 +601,24 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
375601 null );
376602 }
377603
604+ private void initializeConsistencyWriterWithStoreReader (boolean useMultipleWriteLocation , StoreReader reader ) {
605+ addressSelector = Mockito .mock (AddressSelector .class );
606+ sessionContainer = Mockito .mock (ISessionContainer .class );
607+ transportClient = Mockito .mock (TransportClient .class );
608+ IAuthorizationTokenProvider authorizationTokenProvider = Mockito .mock (IAuthorizationTokenProvider .class );
609+ serviceConfigReader = Mockito .mock (GatewayServiceConfigurationReader .class );
610+
611+ consistencyWriter = new ConsistencyWriter (clientContext ,
612+ addressSelector ,
613+ sessionContainer ,
614+ transportClient ,
615+ authorizationTokenProvider ,
616+ serviceConfigReader ,
617+ useMultipleWriteLocation ,
618+ reader ,
619+ null );
620+ }
621+
378622 public static <T > void validateError (Mono <T > single ,
379623 FailureValidator validator ) {
380624 TestSubscriber <T > testSubscriber = TestSubscriber .create ();
0 commit comments