generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathExamplePreferred.cs
More file actions
94 lines (82 loc) · 3.66 KB
/
ExamplePreferred.cs
File metadata and controls
94 lines (82 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.AuroraDsql.Npgsql;
using Npgsql;
namespace Amazon.AuroraDsql.Npgsql.Examples;
/// <summary>
/// Preferred example: demonstrates pool creation, concurrent reads,
/// transactional writes, and cleanup using the DSQL connector.
///
/// Works with both admin and non-admin users:
/// - Admin users operate in the default "public" schema
/// - Non-admin users operate in a custom "myschema" schema
/// </summary>
public static class ExamplePreferred
{
private const int NumConcurrentQueries = 8;
public static async Task RunAsync(string clusterEndpoint, string clusterUser = "admin")
{
// Determine schema based on user type
var schema = clusterUser == "admin" ? "public" : "myschema";
// Create a connection pool via the connector
await using var ds = await AuroraDsql.CreateDataSourceAsync(new DsqlConfig
{
Host = clusterEndpoint,
User = clusterUser,
MaxPoolSize = 10,
MinPoolSize = 2,
OccMaxRetries = 3,
// Set search_path so all connections from the pool use the correct schema
ConfigureConnectionString = csb => csb.SearchPath = schema,
});
// Verify connectivity
await using (var conn = await ds.OpenConnectionAsync())
{
await using var cmd = new NpgsqlCommand("SELECT 1", conn);
await cmd.ExecuteScalarAsync();
}
Console.WriteLine("Connected to Aurora DSQL");
// Ensure the example table exists (DDL — retried on OCC conflict)
await ds.ExecWithRetryAsync(
"CREATE TABLE IF NOT EXISTS example_items (id UUID DEFAULT gen_random_uuid() PRIMARY KEY, name TEXT)");
// --- Concurrent reads ---
var tasks = new Task<string>[NumConcurrentQueries];
for (var i = 0; i < NumConcurrentQueries; i++)
{
var workerId = i + 1;
tasks[i] = Task.Run(async () =>
{
await using var conn = await ds.OpenConnectionAsync();
await using var cmd = new NpgsqlCommand("SELECT $1::int AS worker_id", conn);
cmd.Parameters.AddWithValue(workerId);
var result = await cmd.ExecuteScalarAsync();
return $"Worker {workerId} result: {result}";
});
}
var results = await Task.WhenAll(tasks);
foreach (var r in results)
Console.WriteLine(r);
Console.WriteLine("Concurrent reads completed");
// --- Transactional write (INSERT + COMMIT) with OCC retry ---
// WithTransactionRetryAsync manages BEGIN/COMMIT/ROLLBACK automatically.
// On OCC conflict, it rolls back and re-executes with a fresh connection.
await ds.WithTransactionRetryAsync(async conn =>
{
await using var insert = new NpgsqlCommand(
"INSERT INTO example_items (name) VALUES ($1)", conn);
insert.Parameters.AddWithValue("test-item");
await insert.ExecuteNonQueryAsync();
});
Console.WriteLine("Transactional write completed");
// --- Cleanup (DELETE) ---
await using (var conn = await ds.OpenConnectionAsync())
{
await using var delete = new NpgsqlCommand(
"DELETE FROM example_items WHERE name = $1", conn);
delete.Parameters.AddWithValue("test-item");
await delete.ExecuteNonQueryAsync();
}
Console.WriteLine("Cleanup completed");
Console.WriteLine("Connection pool with concurrent connections exercised successfully");
}
}