Skip to content

Parallel creation issue ccompat #6143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected GA getGA(String groupId, String artifactId) {
}

protected ArtifactVersionMetaDataDto createOrUpdateArtifact(String artifactId, String schema,
String artifactType, List<SchemaReference> references, String groupId) {
String artifactType, List<SchemaReference> references, String groupId, boolean normalize) {
ArtifactVersionMetaDataDto res;
final List<ArtifactReferenceDto> parsedReferences = parseReferences(references, groupId);
final List<ArtifactReference> artifactReferences = parsedReferences.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,36 @@
import io.apicurio.registry.utils.VersionUtil;
import jakarta.interceptor.Interceptors;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.InternalServerErrorException;

import java.math.BigInteger;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static io.apicurio.registry.logging.audit.AuditingConstants.KEY_ARTIFACT_ID;
import static io.apicurio.registry.logging.audit.AuditingConstants.KEY_VERSION;

@Interceptors({ ResponseErrorLivenessCheck.class, ResponseTimeoutReadinessCheck.class })
@Logged
public class SubjectsResourceImpl extends AbstractResource implements SubjectsResource {

private final Cache<GA, Lock> subjectLocks = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS) // Evict locks after 1 hour of inactivity
.maximumSize(10000) // Limit the cache size to 10,000 locks
.build();

@Override
@Authorized(style = AuthorizedStyle.None, level = AuthorizedLevel.Read)
public List<String> getSubjects(String subjectPrefix, Boolean deleted, String groupId) {
Expand Down Expand Up @@ -176,59 +190,72 @@ public SchemaId registerSchemaUnderSubject(String subject, Boolean normalize, St
final boolean fnormalize = normalize == null ? Boolean.FALSE : normalize;
final GA ga = getGA(groupId, subject);

// Check to see if this content is already registered - return the global ID of that content
// if it exists. If not, then register the new content.
long sid = -1;
boolean idFound = false;
if (null == request) {
throw new UnprocessableEntityException("The schema provided is null.");
}

final Map<String, TypedContent> resolvedReferences = resolveReferences(request.getReferences());

Lock lock;
try {
ArtifactVersionMetaDataDto dto = lookupSchema(ga.getRawGroupIdWithNull(), ga.getRawArtifactId(),
request.getSchema(), request.getReferences(), request.getSchemaType(), fnormalize);
if (dto.getState().equals(VersionState.DISABLED)) {
throw new ArtifactNotFoundException(ga.getRawGroupIdWithNull(), ga.getRawArtifactId());
}
sid = cconfig.legacyIdModeEnabled.get() ? dto.getGlobalId() : dto.getContentId();
idFound = true;
}
catch (ArtifactNotFoundException nfe) {
// This is OK - when it happens just move on and create
// Get or create a lock for the specific subject (GA) using the cache
// ReentrantLock::new is called only if the key 'ga' is not already present
lock = subjectLocks.get(ga, ReentrantLock::new);
} catch (ExecutionException e) {
// Handle exception during lock creation if necessary
log.error("Error creating lock for subject: " + ga, e.getCause());
throw new InternalServerErrorException("Error processing request", e.getCause());
}

if (!idFound) {
try {
ContentHandle schemaContent = ContentHandle.create(request.getSchema());
String contentType = ContentTypeUtil.determineContentType(schemaContent);
TypedContent typedSchemaContent = TypedContent.create(schemaContent, contentType);

// We validate the schema at creation time by inferring the type from the content
final String artifactType = ArtifactTypeUtil.determineArtifactType(typedSchemaContent, null,
resolvedReferences, factory);
if (request.getSchemaType() != null && !artifactType.equals(request.getSchemaType())) {
throw new UnprocessableEntityException(
String.format("Given schema is not from type: %s", request.getSchemaType()));
}
lock.lock(); // Acquire the lock

ArtifactVersionMetaDataDto artifactMeta = createOrUpdateArtifact(ga.getRawArtifactId(),
request.getSchema(), artifactType, request.getReferences(),
ga.getRawGroupIdWithNull());
sid = cconfig.legacyIdModeEnabled.get() ? artifactMeta.getGlobalId()
: artifactMeta.getContentId();
}
catch (InvalidArtifactTypeException ex) {
// If no artifact type can be inferred, throw invalid schema ex
throw new UnprocessableEntityException(ex.getMessage());
try {
final Map<String, TypedContent> resolvedReferences = resolveReferences(request.getReferences());
long sid = -1;

try {
// Try to find an existing, active version with the same content
ArtifactVersionMetaDataDto existingDto = lookupSchema(ga.getRawGroupIdWithNull(), ga.getRawArtifactId(),
request.getSchema(), request.getReferences(), request.getSchemaType(), fnormalize);

// lookupSchema throws ArtifactNotFoundException if not found or if the found version is DISABLED.
// If we reach here, an active version was found.
sid = cconfig.legacyIdModeEnabled.get() ? existingDto.getGlobalId() : existingDto.getContentId();

} catch (ArtifactNotFoundException nfe) {
// Schema not found or the only matching version is disabled, proceed to create/update
ArtifactVersionMetaDataDto newOrUpdatedDto = registerNewSchemaVersion(ga, request, fnormalize, resolvedReferences);
sid = cconfig.legacyIdModeEnabled.get() ? newOrUpdatedDto.getGlobalId() : newOrUpdatedDto.getContentId();
}

BigInteger id = converter.convertUnsigned(sid);
SchemaId schemaId = new SchemaId();
schemaId.setId(id.intValue());
return schemaId;
} finally {
lock.unlock(); // Release the lock in the finally block
}
}

BigInteger id = converter.convertUnsigned(sid);
SchemaId schemaId = new SchemaId();
schemaId.setId(id.intValue());
return schemaId;
private ArtifactVersionMetaDataDto registerNewSchemaVersion(GA ga, RegisterSchemaRequest request, boolean fnormalize, Map<String, TypedContent> resolvedReferences) {
try {
ContentHandle schemaContent = ContentHandle.create(request.getSchema());
String contentType = ContentTypeUtil.determineContentType(schemaContent);
TypedContent typedSchemaContent = TypedContent.create(schemaContent, contentType);

// We validate the schema at creation time by inferring the type from the content
final String artifactType = ArtifactTypeUtil.determineArtifactType(typedSchemaContent, null,
resolvedReferences, factory);
if (request.getSchemaType() != null && !artifactType.equals(request.getSchemaType())) {
throw new UnprocessableEntityException(
String.format("Given schema is not from type: %s", request.getSchemaType()));
}

return createOrUpdateArtifact(ga.getRawArtifactId(),
request.getSchema(), artifactType, request.getReferences(),
ga.getRawGroupIdWithNull(), fnormalize);
} catch (InvalidArtifactTypeException ex) {
// If no artifact type can be inferred, throw invalid schema ex
throw new UnprocessableEntityException(ex.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -68,6 +74,7 @@
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -1733,4 +1740,69 @@ public void testSubjectCompatibilityAfterDeletingSubject() throws Exception {
"Top Compatibility Level Exists");

}

@Test
public void testConcurrentSchemaRegistration() throws Exception {
final int numThreads = 10;
final String subject = generateArtifactId();

// Initial schema
final String schema1 = "{\"type\":\"record\",\"namespace\":\"mynamespace\",\"name\":\"employee\"," +
"\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}";

// Different schema (should create new version)
final String schema3 = "{\"type\":\"record\",\"namespace\":\"mynamespace\",\"name\":\"employee\"," +
"\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}," +
"{\"name\":\"age\",\"type\":\"int\"}]}";

// Test Case 1: Initial concurrent registration of the same schema
// All threads should get the same ID since it's the same schema
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
List<Future<Integer>> futures = new ArrayList<>();
Set<Integer> uniqueIds = new HashSet<>();

// Submit the same schema registration request multiple times concurrently
for (int i = 0; i < numThreads; i++) {
futures.add(executorService.submit(() -> {
try {
latch.countDown();
latch.await(); // Wait for all threads to be ready
return confluentClient.registerSchema(schema1, subject);
} catch (RestClientException e) {
throw e;
}
}));
}

// Wait for all tasks to complete
for (Future<Integer> future : futures) {
Integer id = future.get(5, TimeUnit.SECONDS);
uniqueIds.add(id);
}

// All registrations of the same schema should return the same ID
assertEquals(1, uniqueIds.size(), "All registrations of the same schema should return the same ID");
int firstId = uniqueIds.iterator().next();

// Test Case 2: Register new version of schema
int thirdId = confluentClient.registerSchema(schema3, subject);
assertNotEquals(firstId, thirdId, "Registration of different schema should create new version with different ID");

// Verify the versions
List<Integer> versions = confluentClient.getAllVersions(subject);
assertEquals(2, versions.size(), "Should have exactly 2 versions");

// Verify schema contents
io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema1Response = confluentClient.getVersion(subject, 1);
io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema2Response = confluentClient.getVersion(subject, 2);

// First two registrations should have same schema
assertEquals(new AvroSchema(schema1).canonicalString(), new AvroSchema(schema1Response.getSchema()).canonicalString());
// Third registration should have different schema
assertEquals(new AvroSchema(schema3).canonicalString(), new AvroSchema(schema2Response.getSchema()).canonicalString());

executorService.shutdown();
assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.apicurio.tests.utils.Constants.SMOKE;
import static org.hamcrest.CoreMatchers.hasItems;
Expand Down Expand Up @@ -291,4 +297,52 @@ void testCreateDeleteSchemaRuleIsDeleted() throws Exception {
assertThat(1, is(confluentService.getAllSubjects().size()));
}

@Test
void testConcurrentSchemaRegistration() throws Exception {
String subject = TestUtils.generateArtifactId();
// Use a valid Avro record name instead of the subject UUID
String recordName = "TestRecord_" + subject.replace("-", "_"); // Ensure valid chars if needed, or just use a fixed name
String schemaDefinition = "{\"type\":\"record\",\"name\":\"" + recordName + "\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\"}]}";
ParsedSchema schema = new AvroSchema(schemaDefinition);

int numThreads = 50;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
List<CompletableFuture<Integer>> futures = IntStream.range(0, numThreads)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
confluentService.reset(); // clear cache
return confluentService.register(subject, schema);
} catch (IOException | RestClientException e) {
throw new CompletionException(e);
}
}, executorService))
.collect(Collectors.toList());

// Wait for all futures to complete and collect results
List<Integer> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.get(); // Wait for completion and get the list of IDs

// Assert all returned IDs are the same and positive
assertEquals(numThreads, results.size());
int firstId = results.get(0);
assertThat("All registered schema IDs should be the same", firstId > 0);
results.forEach(id -> assertEquals(firstId, id.intValue(), "Concurrent registration resulted in different IDs"));

// Verify only one version was actually created
List<Integer> versions = confluentService.getAllVersions(subject);
assertEquals(1, versions.size(), "Only one version should be created despite concurrent requests");
assertEquals(1, versions.get(0).intValue()); // The version number should be 1

} finally {
executorService.shutdown();
// Clean up
confluentService.deleteSubject(subject, false);
confluentService.deleteSubject(subject, true);
waitForSubjectDeleted(subject);
}
}
}
Loading