Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ConcurrentDictionary as default cache for Schema Registry client #2433

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 54 additions & 20 deletions src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient
private int latestCacheTtlSecs;
private readonly ConcurrentDictionary<int, Schema> schemaById = new ConcurrentDictionary<int, Schema>();

private readonly Dictionary<string /*subject*/, Dictionary<Schema, int>> idBySchemaBySubject =
new Dictionary<string, Dictionary<Schema, int>>();
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, int>> idBySchemaBySubject =
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, int>>();

private readonly Dictionary<string /*subject*/, Dictionary<int, RegisteredSchema>> schemaByVersionBySubject =
new Dictionary<string, Dictionary<int, RegisteredSchema>>();
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<int, RegisteredSchema>> schemaByVersionBySubject =
new ConcurrentDictionary<string, ConcurrentDictionary<int, RegisteredSchema>>();

private readonly Dictionary<string /*subject*/, Dictionary<Schema, RegisteredSchema>> registeredSchemaBySchemaBySubject =
new Dictionary<string, Dictionary<Schema, RegisteredSchema>>();
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, RegisteredSchema>> registeredSchemaBySchemaBySubject =
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, RegisteredSchema>>();

private readonly MemoryCache latestVersionBySubject = new MemoryCache(new MemoryCacheOptions());

Expand Down Expand Up @@ -603,13 +603,21 @@ public Task<int> GetSchemaIdAsync(string subject, string avroSchema, bool normal
/// <inheritdoc/>
public async Task<int> GetSchemaIdAsync(string subject, Schema schema, bool normalize = false)
{
if (idBySchemaBySubject.TryGetValue(subject, out var idBySchema))
{
if (idBySchema.TryGetValue(schema, out int schemaId))
{
return schemaId;
}
}

await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (!this.idBySchemaBySubject.TryGetValue(subject, out Dictionary<Schema, int> idBySchema))
if (!this.idBySchemaBySubject.TryGetValue(subject, out idBySchema))
{
idBySchema = new Dictionary<Schema, int>();
this.idBySchemaBySubject.Add(subject, idBySchema);
idBySchema = new ConcurrentDictionary<Schema, int>();
this.idBySchemaBySubject.TryAdd(subject, idBySchema);
}

// TODO: The following could be optimized in the usual case where idBySchema only
Expand Down Expand Up @@ -640,13 +648,21 @@ public async Task<int> GetSchemaIdAsync(string subject, Schema schema, bool norm
/// <inheritdoc/>
public async Task<int> RegisterSchemaAsync(string subject, Schema schema, bool normalize = false)
{
if (idBySchemaBySubject.TryGetValue(subject, out var idBySchema))
{
if (idBySchema.TryGetValue(schema, out var schemaId))
{
return schemaId;
}
}

await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (!this.idBySchemaBySubject.TryGetValue(subject, out Dictionary<Schema, int> idBySchema))
if (!this.idBySchemaBySubject.TryGetValue(subject, out idBySchema))
{
idBySchema = new Dictionary<Schema, int>();
this.idBySchemaBySubject[subject] = idBySchema;
idBySchema = new ConcurrentDictionary<Schema, int>();
idBySchemaBySubject.TryAdd(subject, idBySchema);
}

// TODO: This could be optimized in the usual case where idBySchema only
Expand Down Expand Up @@ -703,13 +719,21 @@ private bool checkSchemaMatchesFormat(string format, string schemaString)
public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas,
bool normalize = false)
{
if (registeredSchemaBySchemaBySubject.TryGetValue(subject, out var registeredSchemaBySchema))
{
if (registeredSchemaBySchema.TryGetValue(schema, out var registeredSchema))
{
return registeredSchema;
}
}

await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (!registeredSchemaBySchemaBySubject.TryGetValue(subject, out var registeredSchemaBySchema))
if (!registeredSchemaBySchemaBySubject.TryGetValue(subject, out registeredSchemaBySchema))
{
CleanCacheIfFull();
registeredSchemaBySchema = new Dictionary<Schema, RegisteredSchema>();
registeredSchemaBySchema = new ConcurrentDictionary<Schema, RegisteredSchema>();
registeredSchemaBySchemaBySubject[subject] = registeredSchemaBySchema;
}
if (!registeredSchemaBySchema.TryGetValue(schema, out var registeredSchema))
Expand All @@ -729,10 +753,15 @@ public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema sch
/// <inheritdoc/>
public async Task<Schema> GetSchemaAsync(int id, string format = null)
{
if (schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
{
return schema;
}

await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (!this.schemaById.TryGetValue(id, out Schema schema) ||
if (!this.schemaById.TryGetValue(id, out schema) ||
!checkSchemaMatchesFormat(format, schema.SchemaString))
{
CleanCacheIfFull();
Expand All @@ -753,7 +782,7 @@ public async Task<Schema> GetSchemaAsync(int id, string format = null)
/// <inheritdoc/>
public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null)
{
if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
if (this.schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
{
return schema;
}
Expand Down Expand Up @@ -781,19 +810,24 @@ public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, s
/// <inheritdoc/>
public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true)
{
if (schemaByVersionBySubject.TryGetValue(subject, out var schemaByVersion) &&
schemaByVersion.TryGetValue(version, out var schema))
{
return schema;
}

await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
CleanCacheIfFull();

if (!schemaByVersionBySubject.TryGetValue(subject,
out Dictionary<int, RegisteredSchema> schemaByVersion))
if (!schemaByVersionBySubject.TryGetValue(subject, out schemaByVersion))
{
schemaByVersion = new Dictionary<int, RegisteredSchema>();
schemaByVersion = new ConcurrentDictionary<int, RegisteredSchema>();
schemaByVersionBySubject[subject] = schemaByVersion;
}

if (!schemaByVersion.TryGetValue(version, out RegisteredSchema schema))
if (!schemaByVersion.TryGetValue(version, out schema))
{
schema = await restService.GetSchemaAsync(subject, version)
.ConfigureAwait(continueOnCapturedContext: false);
Expand Down