-
Notifications
You must be signed in to change notification settings - Fork 329
Expand file tree
/
Copy pathMarsSessionPoolingTest.cs
More file actions
474 lines (411 loc) · 20.7 KB
/
MarsSessionPoolingTest.cs
File metadata and controls
474 lines (411 loc) · 20.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Data;
using System.Linq;
using System.Threading;
using Microsoft.Data.SqlClient.Tests.Common;
using Xunit;
#nullable enable
namespace Microsoft.Data.SqlClient.ManualTesting.Tests
{
public class MarsSessionPoolingTest
{
private const int ConcurrentCommands = 5;
// Synapse: Catalog view 'dm_exec_connections' is not supported in this version.
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteScalar_DisposeCommand(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
connection.Open();
// Act / Assert
foreach (SqlCommand command in commands)
{
// Act
// - Run command
command.ExecuteScalar();
// - Dispose command
command.Dispose();
// Assert
// - Count of sessions/requests should stay the same
// Each request runs to completion, none are running concurrently, so additional
// MARS sessions do not need to be opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteScalar_CloseConnection(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
connection.Open();
// Act / Assert
foreach (SqlCommand command in commands)
{
// Act
// - Run command
command.ExecuteScalar();
// - Close and reopen connection (return to pool)
connection.Close();
connection.Open();
// Assert
// - Count of sessions/requests should stay the same
// Each request runs to completion, none are running concurrently, so additional
// MARS sessions do not need to be opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteNonQuery_DisposeCommand(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
connection.Open();
// Act / Assert
foreach (SqlCommand command in commands)
{
// Act
// - Run command
command.ExecuteNonQuery();
// - Dispose command
command.Dispose();
// Assert
// - Count of sessions/requests should stay the same
// Each request runs to completion, none are running concurrently, so additional
// MARS sessions do not need to be opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteNonQuery_CloseConnection(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
connection.Open();
// Act / Assert
foreach (SqlCommand command in commands)
{
// Act
// - Run command
command.ExecuteNonQuery();
// - Close and reopen connection (return to pool)
connection.Close();
connection.Open();
// Assert
// - Count of sessions/requests should stay the same
// Each request runs to completion, none are running concurrently, so additional
// MARS sessions do not need to be opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_CloseReader(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
using DisposableArray<SqlDataReader> readers = new DisposableArray<SqlDataReader>(commands.Length);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command
readers[i] = commands[i].ExecuteReader();
// - Close reader
readers[i].Close();
// Assert
// - Count of sessions/requests should stay the same
// Closing the reader completes the request, so no requests are running
// concurrently, so no additional MARS sessions should have been opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_DisposeReader(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
using DisposableArray<SqlDataReader> readers = new DisposableArray<SqlDataReader>(commands.Length);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command
readers[i] = commands[i].ExecuteReader();
// - Dispose reader
readers[i].Dispose();
// Assert
// - Count of sessions/requests should stay the same
// Disposing the reader completes the request, so no requests are running
// concurrently, so no additional MARS sessions should have been opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[Trait("Category", "flaky")]
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_GarbageCollectReader(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command and get weak reference to reader.
// This must happen in another scope otherwise the reader will not be marked for
// garbage collection.
WeakReference readerWeakReference = OpenReaderThenNullify(commands[i]);
// - Run the garbage collector
GC.Collect();
GC.WaitForPendingFinalizers();
// Assert
// - Make sure reader has been collected by now, otherwise results are invalid
Assert.False(readerWeakReference.IsAlive);
// - Count of open sessions/requests will increase with each iteration
// Finalizing a data reader does *not* close it, meaning the MARS session is left
// in an incomplete state. As such, with each command that's executed, a new
// session is opened.
AssertSessionsAndRequests(connection, openMarsSessions: i + 1, openRequests: i + 1);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_DisposeCommand(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
using DisposableArray<SqlDataReader> readers = new DisposableArray<SqlDataReader>(commands.Length);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command
readers[i] = commands[i].ExecuteReader();
// - Dispose the command
commands[i].Dispose();
// Assert
// - Count of open sessions/requests will increase with each iteration
// Disposing of the command does *not* close the reader, meaning the MARS session
// is left in an incomplete state. As such, with each command that's executed, a
// new session is opened.
AssertSessionsAndRequests(connection, openMarsSessions: i + 1, openRequests: i + 1);
}
}
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_CloseConnection(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
using DisposableArray<SqlDataReader> readers = new DisposableArray<SqlDataReader>(commands.Length);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command
readers[i] = commands[i].ExecuteReader();
// - Close and reopen connection (return to pool)
connection.Close();
connection.Open();
// Assert
// - Count of sessions/requests should stay the same
// Closing the connection completes any pending requests, so no requests are
// running concurrently, so no additional MARS sessions should have been opened.
AssertSessionsAndRequests(connection, openMarsSessions: 0, openRequests: 0);
}
}
[Trait("Category", "flaky")]
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsNotAzureSynapse), nameof(DataTestUtility.IsNotManagedInstance))]
[InlineData(CommandType.Text)]
[InlineData(CommandType.StoredProcedure)]
public void ExecuteReader_NoCloses(CommandType commandType)
{
// Arrange
using SqlConnection connection = GetConnection();
using DisposableArray<SqlCommand> commands = GetCommands(connection, commandType);
using DisposableArray<SqlDataReader> readers = new DisposableArray<SqlDataReader>(commands.Length);
connection.Open();
// Act / Assert
for (int i = 0; i < commands.Length; i++)
{
// Act
// - Run command, close nothing!
readers[i] = commands[i].ExecuteReader();
// Assert
// - Count of open sessions/requests will increase with each iteration
// Leaving a data reader open leaves the MARS session in an incomplete state. As
// such, with each command that's executed, a new session is opened.
AssertSessionsAndRequests(connection, openMarsSessions: i + 1, openRequests: i + 1);
}
}
/// <summary>
/// Asserts the number of open sessions and pending requests on the connection.
/// </summary>
/// <param name="connection">Connection to check open sessions and pending requests on</param>
/// <param name="openMarsSessions">
/// Number of MARS sessions expected to be open/in use by the test. The sessions for the
/// main connection and validation query will be added before assertion.
/// </param>
/// <param name="openRequests">
/// Number of open requests expected to be pending by use of the test. The request for the
/// validation query will be added before assertion.
/// </param>
/// <exception cref="Exception">
/// Thrown if any of the validation result sets are missing or empty.
/// </exception>
private static void AssertSessionsAndRequests(
SqlConnection connection,
int openMarsSessions,
int openRequests)
{
const int maxAttempts = 5;
// For these tests, the expected session count will always be at least 2 in MARS mode:
// 1 for the main connection
// 1 for the verification command we just executed
int? observedSessions = null;
int expectedSessions = openMarsSessions + 2;
// For these tests, the expected request count will always be at least 1:
// 1 for the verification command we just executed
int? observedRequests = null;
int expectedRequests = openRequests + 1;
// There is a race between opening new sessions and them appearing in the DMV tables.
// As such, we want to poll the DMV a few times before declaring the wrong behavior was
// observed.
for (int attempt = 0; attempt < maxAttempts; attempt++)
{
(observedSessions, observedRequests) = QuerySessionCounters(connection);
if (observedSessions == expectedSessions && observedRequests == expectedRequests)
{
// We observed the expected values.
return;
}
// Back off and wait before trying again
Thread.SpinWait(20 << attempt);
}
// If we make it to here, we never saw the expected numbers, so fail the test with the
// last value we observed.
Assert.Equal(expectedSessions, observedSessions);
Assert.Equal(expectedRequests, observedRequests);
}
private static DisposableArray<SqlCommand> GetCommands(SqlConnection connection, CommandType commandType)
{
DisposableArray<SqlCommand> result = new(ConcurrentCommands);
for (int i = 0; i < result.Length; i++)
{
switch (commandType)
{
case CommandType.Text:
// 100 rows from an in-memory VALUES generator, repeated as 20 result sets.
// Produces enough output to span multiple 512-byte TDS packets (so the
// request stays observable in dm_exec_requests for the MARS tests) without
// the per-call CPU cost of scanning sys.databases on Azure SQL.
const string rowGen =
"SELECT a.n FROM (VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) AS a(n) " +
"CROSS JOIN (VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)) AS b(n);";
string commandText = string.Join(" ", Enumerable.Repeat(rowGen, 20));
commandText += @" PRINT 'THIS IS THE END!'";
result[i] = new SqlCommand
{
CommandText = commandText,
CommandTimeout = 120,
CommandType = CommandType.Text,
Connection = connection
};
break;
case CommandType.StoredProcedure:
// sp_server_info returns a small, fixed result set and avoids the
// server-wide session scan that sp_who performs on shared Azure SQL.
result[i] = new SqlCommand
{
CommandText = "sp_server_info",
CommandTimeout = 120,
CommandType = CommandType.StoredProcedure,
Connection = connection
};
break;
default:
throw new InvalidOperationException("Not supported test type");
}
}
return result;
}
private static SqlConnection GetConnection()
{
// Generate a unique name for the application to ensure pool isolation between tests
string applicationName = $"SqlClientMarsPoolingTests:{Guid.NewGuid()}";
string connectionString = new SqlConnectionStringBuilder(DataTestUtility.TCPConnectionString)
{
ApplicationName = applicationName,
PacketSize = 512,
MaxPoolSize = 1,
MultipleActiveResultSets = true
}.ConnectionString;
return new SqlConnection(connectionString);
}
private static WeakReference OpenReaderThenNullify(SqlCommand command)
{
SqlDataReader? reader = command.ExecuteReader();
WeakReference weak = new WeakReference(reader);
reader = null;
return weak;
}
private static (int sessions, int requests) QuerySessionCounters(SqlConnection connection)
{
using SqlCommand verificationCommand = new SqlCommand();
verificationCommand.CommandText =
@"SELECT COUNT(*) AS SessionCount " +
@"FROM sys.dm_exec_connections " +
@"WHERE session_id=@@spid AND net_transport='Session'; " +
@"SELECT COUNT(*) AS RequestCount " +
@"FROM sys.dm_exec_requests " +
@"WHERE session_id=@@spid AND (status='running' OR status='suspended')";
verificationCommand.CommandType = CommandType.Text;
verificationCommand.Connection = connection;
// Result 1) Count of active sessions from sys.dm_exec_connections
using SqlDataReader reader = verificationCommand.ExecuteReader();
if (!reader.Read())
{
throw new Exception("Expected dm_exec_connections results from verification command");
}
int sessions = reader.GetInt32(0);
// Result 2) Count of active requests from sys.dm_exec_requests
if (!reader.NextResult() || !reader.Read())
{
throw new Exception("Expected dm_exec_requests results from verification command");
}
int requests = reader.GetInt32(0);
return (sessions, requests);
}
}
}