55namespace Microsoft . Azure . Cosmos . SDK . EmulatorTests
66{
77 using System ;
8+ using System . Collections . Concurrent ;
89 using System . Collections . Generic ;
910 using System . Diagnostics ;
1011 using System . IO ;
@@ -19,6 +20,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
1920 using System . Threading ;
2021 using System . Threading . Tasks ;
2122 using global ::Azure ;
23+ using Microsoft . Azure . Cosmos . FaultInjection ;
2224 using Microsoft . Azure . Cosmos . Query . Core ;
2325 using Microsoft . Azure . Cosmos . Services . Management . Tests . LinqProviderTests ;
2426 using Microsoft . Azure . Cosmos . Telemetry ;
@@ -936,7 +938,136 @@ public async Task MultiRegionAccountTest()
936938 AccountProperties properties = await cosmosClient . ReadAccountAsync ( ) ;
937939 Assert . IsNotNull ( properties ) ;
938940 }
939-
941+
942+ [ TestMethod ]
943+ [ Owner ( "amudumba" ) ]
944+ public async Task CreateItemDuringTimeoutTest ( )
945+ {
946+ //Prepare
947+ //Enabling aggressive timeout detection that empowers connnection health checker whih marks a channel/connection as "unhealthy" if there are a set of consecutive timeouts.
948+ Environment . SetEnvironmentVariable ( "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED" , "True" ) ;
949+ Environment . SetEnvironmentVariable ( "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS" , "1" ) ;
950+
951+ // Enabling fault injection rule to simulate a timeout scenario.
952+ string timeoutRuleId = "timeoutRule-" + Guid . NewGuid ( ) . ToString ( ) ;
953+ FaultInjectionRule timeoutRule = new FaultInjectionRuleBuilder (
954+ id : timeoutRuleId ,
955+ condition :
956+ new FaultInjectionConditionBuilder ( )
957+ . WithOperationType ( FaultInjectionOperationType . CreateItem )
958+ . Build ( ) ,
959+ result :
960+ FaultInjectionResultBuilder . GetResultBuilder ( FaultInjectionServerErrorType . SendDelay )
961+ . WithDelay ( TimeSpan . FromSeconds ( 100 ) )
962+ . Build ( ) )
963+ . Build ( ) ;
964+
965+ List < FaultInjectionRule > rules = new List < FaultInjectionRule > { timeoutRule } ;
966+ FaultInjector faultInjector = new FaultInjector ( rules ) ;
967+
968+
969+ CosmosClientOptions cosmosClientOptions = new CosmosClientOptions ( )
970+ {
971+ ConsistencyLevel = Cosmos . ConsistencyLevel . Session ,
972+ FaultInjector = faultInjector ,
973+ RequestTimeout = TimeSpan . FromSeconds ( 2 )
974+
975+ } ;
976+
977+ Cosmos . Database db = null ;
978+ try
979+ {
980+ CosmosClient cosmosClient = TestCommon . CreateCosmosClient ( clientOptions : cosmosClientOptions ) ;
981+
982+ db = await cosmosClient . CreateDatabaseIfNotExistsAsync ( "TimeoutFaultTest" ) ;
983+ Container container = await db . CreateContainerIfNotExistsAsync ( "TimeoutFaultContainer" , "/pk" ) ;
984+
985+ // Act.
986+ // Simulate a aggressive timeout scenario by performing 3 writes which will all timeout due to fault injection rule.
987+ for ( int i = 0 ; i < 3 ; i ++ )
988+ {
989+ try
990+ {
991+ ToDoActivity testItem = ToDoActivity . CreateRandomToDoActivity ( ) ;
992+ await container . CreateItemAsync < ToDoActivity > ( testItem ) ;
993+ }
994+ catch ( CosmosException exx )
995+ {
996+ Assert . AreEqual ( HttpStatusCode . RequestTimeout , exx . StatusCode ) ;
997+ }
998+ }
999+
1000+ //Assert that the old channel that is now made unhealthy by the timeouts and a new healthy channel is available for next requests.
1001+
1002+
1003+ // Get all the channels that are under TransportClient -> ChannelDictionary -> Channels.
1004+ IStoreClientFactory factory = ( IStoreClientFactory ) cosmosClient . DocumentClient . GetType ( )
1005+ . GetField ( "storeClientFactory" , BindingFlags . NonPublic | BindingFlags . Instance )
1006+ . GetValue ( cosmosClient . DocumentClient ) ;
1007+ StoreClientFactory storeClientFactory = ( StoreClientFactory ) factory ;
1008+
1009+ TransportClient client = ( TransportClient ) storeClientFactory . GetType ( )
1010+ . GetField ( "transportClient" , BindingFlags . NonPublic | BindingFlags . Instance )
1011+ . GetValue ( storeClientFactory ) ;
1012+ Documents . Rntbd . TransportClient transportClient = ( Documents . Rntbd . TransportClient ) client ;
1013+
1014+ Documents . Rntbd . ChannelDictionary channelDict = ( Documents . Rntbd . ChannelDictionary ) transportClient . GetType ( )
1015+ . GetField ( "channelDictionary" , BindingFlags . NonPublic | BindingFlags . Instance )
1016+ . GetValue ( transportClient ) ;
1017+ ConcurrentDictionary < Documents . Rntbd . ServerKey , Documents . Rntbd . IChannel > allChannels = ( ConcurrentDictionary < Documents . Rntbd . ServerKey , Documents . Rntbd . IChannel > ) channelDict . GetType ( )
1018+ . GetField ( "channels" , BindingFlags . NonPublic | BindingFlags . Instance )
1019+ . GetValue ( channelDict ) ;
1020+
1021+ //Assert that the old channel that is now made unhealthy by the timeouts.
1022+ //Get the channel by channelDict -> LoadBalancingChannel -> LoadBalancingPartition -> LbChannelState -> IChannel.
1023+ Documents . Rntbd . LoadBalancingChannel loadBalancingUnhealthyChannel = ( Documents . Rntbd . LoadBalancingChannel ) allChannels [ allChannels . Keys . ElementAt ( 1 ) ] ;
1024+ Documents . Rntbd . LoadBalancingPartition loadBalancingPartitionUnHealthy = ( Documents . Rntbd . LoadBalancingPartition ) loadBalancingUnhealthyChannel . GetType ( )
1025+ . GetField ( "singlePartition" , BindingFlags . NonPublic | BindingFlags . Instance )
1026+ . GetValue ( loadBalancingUnhealthyChannel ) ;
1027+
1028+ Assert . IsNotNull ( loadBalancingPartitionUnHealthy ) ;
1029+
1030+ List < Documents . Rntbd . LbChannelState > openChannelsUnhealthy = ( List < Documents . Rntbd . LbChannelState > ) loadBalancingPartitionUnHealthy . GetType ( )
1031+ . GetField ( "openChannels" , BindingFlags . NonPublic | BindingFlags . Instance )
1032+ . GetValue ( loadBalancingPartitionUnHealthy ) ;
1033+ Assert . AreEqual ( 1 , openChannelsUnhealthy . Count ) ;
1034+
1035+ foreach ( Documents . Rntbd . LbChannelState channelState in openChannelsUnhealthy )
1036+ {
1037+ Documents . Rntbd . IChannel channel = ( Documents . Rntbd . IChannel ) openChannelsUnhealthy [ 0 ] . GetType ( )
1038+ . GetField ( "channel" , BindingFlags . NonPublic | BindingFlags . Instance )
1039+ . GetValue ( channelState ) ;
1040+ Assert . IsFalse ( channel . Healthy ) ;
1041+ }
1042+
1043+ //Assert that the new channel which is healthy. Picking the first channel from the allChannels dictionary as the new channel.
1044+ Documents . Rntbd . LoadBalancingChannel loadBalancingChannel = ( Documents . Rntbd . LoadBalancingChannel ) allChannels [ allChannels . Keys . First ( ) ] ;
1045+ Documents . Rntbd . LoadBalancingPartition loadBalancingPartition = ( Documents . Rntbd . LoadBalancingPartition ) loadBalancingChannel . GetType ( )
1046+ . GetField ( "singlePartition" , BindingFlags . NonPublic | BindingFlags . Instance )
1047+ . GetValue ( loadBalancingChannel ) ;
1048+
1049+ Assert . IsNotNull ( loadBalancingPartition ) ;
1050+
1051+ List < Documents . Rntbd . LbChannelState > openChannels = ( List < Documents . Rntbd . LbChannelState > ) loadBalancingPartition . GetType ( )
1052+ . GetField ( "openChannels" , BindingFlags . NonPublic | BindingFlags . Instance )
1053+ . GetValue ( loadBalancingPartition ) ;
1054+ Assert . AreEqual ( 1 , openChannels . Count ) ;
1055+
1056+ foreach ( Documents . Rntbd . LbChannelState channelState in openChannels )
1057+ {
1058+ Documents . Rntbd . IChannel channel = ( Documents . Rntbd . IChannel ) openChannels [ 0 ] . GetType ( )
1059+ . GetField ( "channel" , BindingFlags . NonPublic | BindingFlags . Instance )
1060+ . GetValue ( channelState ) ;
1061+ Assert . IsTrue ( channel . Healthy ) ;
1062+ }
1063+ }
1064+ finally
1065+ {
1066+ Environment . SetEnvironmentVariable ( "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED" , null ) ;
1067+ Environment . SetEnvironmentVariable ( "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS" , null ) ;
1068+ if ( db != null ) await db . DeleteAsync ( ) ;
1069+ }
1070+ }
9401071 public static IReadOnlyList < string > GetActiveConnections ( )
9411072 {
9421073 string testPid = Process . GetCurrentProcess ( ) . Id . ToString ( ) ;
0 commit comments