Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 49afa24

Browse files
committedDec 19, 2024·
Merge master
2 parents d63ca5f + aef7faa commit 49afa24

File tree

26 files changed

+432
-152
lines changed

26 files changed

+432
-152
lines changed
 

‎.github/pull_request_template.md

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<!--
2+
Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block.
3+
-->
4+
What
5+
----
6+
<!--
7+
Briefly describe **what** you have changed and **why**.
8+
Optionally include implementation strategy.
9+
-->
10+
11+
Checklist
12+
------------------
13+
- [ ] Contains customer facing changes? Including API/behavior changes <!-- This can help identify if it has introduced any breaking changes -->
14+
- [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
15+
- If not, please explain why it is not required
16+
17+
References
18+
----------
19+
JIRA:
20+
<!--
21+
Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations...
22+
For code bumps: link to PR, tag or GitHub `/compare/master...master`
23+
-->
24+
25+
Test & Review
26+
------------
27+
<!--
28+
Has it been tested? how?
29+
Copy&paste any handy instructions, steps or requirements that can save time to the reviewer or any reader.
30+
-->
31+
32+
Open questions / Follow-ups
33+
--------------------------
34+
<!--
35+
Optional: anything open to discussion for the reviewer, out of scope, or follow-ups.
36+
-->
37+
38+
<!--
39+
Review stakeholders
40+
------------------
41+
<!--
42+
Optional: mention stakeholders or if special context that is required to review.
43+
-->

‎3RD_PARTY.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ To add your project, open a pull request!
1010
- [Multi Schema Avro Deserializer](https://github.com/ycherkes/multi-schema-avro-desrializer) - Avro deserializer for reading messages serialized with multiple schemas.
1111
- [OpenSleigh.Transport.Kafka](https://github.com/mizrael/OpenSleigh/tree/develop/src/OpenSleigh.Transport.Kafka) - A Kafka Transport for OpenSleigh, a distributed saga management library.
1212
- [SlimMessageBus.Host.Kafka](https://github.com/zarusz/SlimMessageBus) - Apache Kafka transport for SlimMessageBus (lightweight message bus for .NET)
13+
- [Kafka Core](https://github.com/ffernandolima/confluent-kafka-core-dotnet) - Kafka Core empowers developers to build robust .NET applications on top of Confluent Kafka, focusing on simplicity, maintainability, and extensibility with intuitive abstractions and builders.

‎src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class AwsKmsClient : IKmsClient
1212
{
1313
private AmazonKeyManagementServiceClient kmsClient;
1414
private string keyId;
15-
15+
1616
public string KekId { get; }
1717

1818
public AwsKmsClient(string kekId, AWSCredentials credentials)
@@ -36,7 +36,7 @@ public AwsKmsClient(string kekId, AWSCredentials credentials)
3636

3737
public bool DoesSupport(string uri)
3838
{
39-
return uri.StartsWith(AwsKmsDriver.Prefix);
39+
return KekId == uri;
4040
}
4141

4242
public async Task<byte[]> Encrypt(byte[] plaintext)

‎src/Confluent.SchemaRegistry.Encryption.Aws/AwsKmsDriver.cs

+57-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using Amazon.Runtime;
4+
using Amazon.Runtime.CredentialManagement;
35

46
namespace Confluent.SchemaRegistry.Encryption.Aws
57
{
@@ -13,20 +15,73 @@ public static void Register()
1315
public static readonly string Prefix = "aws-kms://";
1416
public static readonly string AccessKeyId = "access.key.id";
1517
public static readonly string SecretAccessKey = "secret.access.key";
16-
18+
public static readonly string Profile = "profile";
19+
public static readonly string RoleArn = "role.arn";
20+
public static readonly string RoleSessionName = "role.session.name";
21+
public static readonly string RoleExternalId = "role.external.id";
22+
1723
public string GetKeyUrlPrefix()
1824
{
1925
return Prefix;
2026
}
2127

2228
public IKmsClient NewKmsClient(IDictionary<string, string> config, string keyUrl)
2329
{
30+
config.TryGetValue(RoleArn, out string roleArn);
31+
if (roleArn == null)
32+
{
33+
roleArn = Environment.GetEnvironmentVariable("AWS_ROLE_ARN");
34+
}
35+
config.TryGetValue(RoleSessionName, out string roleSessionName);
36+
if (roleSessionName == null)
37+
{
38+
roleSessionName = Environment.GetEnvironmentVariable("AWS_ROLE_SESSION_NAME");
39+
}
40+
config.TryGetValue(RoleExternalId, out string roleExternalId);
41+
if (roleExternalId == null)
42+
{
43+
roleExternalId = Environment.GetEnvironmentVariable("AWS_ROLE_EXTERNAL_ID");
44+
}
2445
AWSCredentials credentials = null;
2546
if (config.TryGetValue(AccessKeyId, out string accessKeyId)
2647
&& config.TryGetValue(SecretAccessKey, out string secretAccessKey))
2748
{
2849
credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
2950
}
51+
else if (config.TryGetValue(Profile, out string profile))
52+
{
53+
var credentialProfileStoreChain = new CredentialProfileStoreChain();
54+
if (credentialProfileStoreChain.TryGetAWSCredentials(
55+
profile, out AWSCredentials creds))
56+
credentials = creds;
57+
}
58+
if (credentials == null)
59+
{
60+
credentials = FallbackCredentialsFactory.GetCredentials();
61+
}
62+
if (roleArn != null)
63+
{
64+
if (string.IsNullOrEmpty(roleExternalId))
65+
{
66+
credentials = new AssumeRoleAWSCredentials(
67+
credentials,
68+
roleArn,
69+
roleSessionName ?? "confluent-encrypt");
70+
}
71+
else
72+
{
73+
var options = new AssumeRoleAWSCredentialsOptions
74+
{
75+
ExternalId = roleExternalId
76+
};
77+
78+
credentials = new AssumeRoleAWSCredentials(
79+
credentials,
80+
roleArn,
81+
roleSessionName ?? "confluent-encrypt",
82+
options);
83+
}
84+
}
3085
return new AwsKmsClient(keyUrl, credentials);
3186
}
3287
}

‎src/Confluent.SchemaRegistry.Encryption.Aws/Confluent.SchemaRegistry.Encryption.Aws.csproj

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
</ItemGroup>
3030

3131
<ItemGroup>
32-
<PackageReference Include="AWSSDK.KeyManagementService" Version="3.7.302.19" />
32+
<PackageReference Include="AWSSDK.KeyManagementService" Version="3.7.400.61" />
33+
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.401.10" />
3334
</ItemGroup>
3435

3536
<ItemGroup>

‎src/Confluent.SchemaRegistry.Encryption.Azure/AzureKmsClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public AzureKmsClient(string kekId, TokenCredential tokenCredential)
2525

2626
public bool DoesSupport(string uri)
2727
{
28-
return uri.StartsWith(AzureKmsDriver.Prefix);
28+
return KekId == uri;
2929
}
3030

3131
public async Task<byte[]> Encrypt(byte[] plaintext)

‎src/Confluent.SchemaRegistry.Encryption.Gcp/GcpKmsClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public GcpKmsClient(string kekId, GoogleCredential credential)
3636

3737
public bool DoesSupport(string uri)
3838
{
39-
return uri.StartsWith(GcpKmsDriver.Prefix);
39+
return KekId == uri;
4040
}
4141

4242
public async Task<byte[]> Encrypt(byte[] plaintext)

‎src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsClient.cs

+1-6
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ public class HcVaultKmsClient : IKmsClient
2121

2222
public HcVaultKmsClient(string kekId, string ns, string tokenId)
2323
{
24-
if (tokenId == null)
25-
{
26-
tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN");
27-
ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE");
28-
}
2924
KekId = kekId;
3025
Namespace = ns;
3126
TokenId = tokenId;
@@ -53,7 +48,7 @@ public HcVaultKmsClient(string kekId, string ns, string tokenId)
5348

5449
public bool DoesSupport(string uri)
5550
{
56-
return uri.StartsWith(HcVaultKmsDriver.Prefix);
51+
return KekId == uri;
5752
}
5853

5954
public async Task<byte[]> Encrypt(byte[] plaintext)

‎src/Confluent.SchemaRegistry.Encryption.HcVault/HcVaultKmsDriver.cs

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public IKmsClient NewKmsClient(IDictionary<string, string> config, string keyUrl
2323
{
2424
config.TryGetValue(TokenId, out string tokenId);
2525
config.TryGetValue(Namespace, out string ns);
26+
if (tokenId == null)
27+
{
28+
tokenId = Environment.GetEnvironmentVariable("VAULT_TOKEN");
29+
ns = Environment.GetEnvironmentVariable("VAULT_NAMESPACE");
30+
}
2631
return new HcVaultKmsClient(keyUrl, ns, tokenId);
2732
}
2833
}

‎src/Confluent.SchemaRegistry.Encryption/LocalKmsClient.cs

-8
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@ public class LocalKmsClient : IKmsClient
1616

1717
public LocalKmsClient(string secret)
1818
{
19-
if (secret == null)
20-
{
21-
secret = Environment.GetEnvironmentVariable("LOCAL_SECRET");
22-
}
23-
if (secret == null)
24-
{
25-
throw new ArgumentNullException("Cannot load secret");
26-
}
2719
Secret = secret;
2820
cryptor = new Cryptor(DekFormat.AES128_GCM);
2921
byte[] rawKey = Hkdf.DeriveKey(

‎src/Confluent.SchemaRegistry.Encryption/LocalKmsDriver.cs

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ public string GetKeyUrlPrefix()
2222
public IKmsClient NewKmsClient(IDictionary<string, string> config, string keyUrl)
2323
{
2424
config.TryGetValue(Secret, out string secret);
25+
if (secret == null)
26+
{
27+
secret = Environment.GetEnvironmentVariable("LOCAL_SECRET");
28+
}
29+
if (secret == null)
30+
{
31+
throw new ArgumentNullException("Cannot load secret");
32+
}
2533
return new LocalKmsClient(secret);
2634
}
2735
}

‎src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

+12-36
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ namespace Confluent.SchemaRegistry.Serdes
3131
{
3232
internal class GenericSerializerImpl : AsyncSerializer<GenericRecord, Avro.Schema>
3333
{
34-
private Dictionary<Avro.Schema, string> knownSchemas = new Dictionary<global::Avro.Schema, string>();
35-
private HashSet<KeyValuePair<string, string>> registeredSchemas = new HashSet<KeyValuePair<string, string>>();
36-
private Dictionary<string, int> schemaIds = new Dictionary<string, int>();
34+
private Dictionary<Avro.Schema, string> knownSchemas =
35+
new Dictionary<global::Avro.Schema, string>();
36+
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
37+
new Dictionary<KeyValuePair<string, string>, int>();
3738

3839
public GenericSerializerImpl(
3940
ISchemaRegistryClient schemaRegistryClient,
@@ -99,12 +100,10 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
99100
// something more sophisticated than the below + not allow
100101
// the misuse to keep happening without warning.
101102
if (knownSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
102-
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
103-
schemaIds.Count > schemaRegistryClient.MaxCachedSchemas)
103+
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas)
104104
{
105105
knownSchemas.Clear();
106106
registeredSchemas.Clear();
107-
schemaIds.Clear();
108107
}
109108

110109
// Determine a schema string corresponding to the schema object.
@@ -139,41 +138,18 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
139138
{
140139
schemaId = latestSchema.Id;
141140
}
142-
else if (!registeredSchemas.Contains(subjectSchemaPair))
141+
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
143142
{
144-
int newSchemaId;
145-
146143
// first usage: register/get schema to check compatibility
147-
if (autoRegisterSchema)
148-
{
149-
newSchemaId = await schemaRegistryClient
144+
schemaId = autoRegisterSchema
145+
? await schemaRegistryClient
150146
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
147+
.ConfigureAwait(continueOnCapturedContext: false)
148+
: await schemaRegistryClient
149+
.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
151150
.ConfigureAwait(continueOnCapturedContext: false);
152-
}
153-
else
154-
{
155-
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
156-
.ConfigureAwait(continueOnCapturedContext: false);
157-
}
158-
159-
if (!schemaIds.ContainsKey(writerSchemaString))
160-
{
161-
schemaIds.Add(writerSchemaString, newSchemaId);
162-
}
163-
else if (schemaIds[writerSchemaString] != newSchemaId)
164-
{
165-
schemaIds.Clear();
166-
registeredSchemas.Clear();
167-
throw new KafkaException(new Error(isKey ? ErrorCode.Local_KeySerialization : ErrorCode.Local_ValueSerialization, $"Duplicate schema registration encountered: Schema ids {schemaIds[writerSchemaString]} and {newSchemaId} are associated with the same schema."));
168-
}
169-
170-
registeredSchemas.Add(subjectSchemaPair);
171151

172-
schemaId = schemaIds[writerSchemaString];
173-
}
174-
else
175-
{
176-
schemaId = schemaIds[writerSchemaString];
152+
registeredSchemas.Add(subjectSchemaPair, schemaId);
177153
}
178154
}
179155
finally
There was a problem loading the remainder of the diff.

0 commit comments

Comments
 (0)
Please sign in to comment.