diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 0e5d14b1..e65da7a9 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -30,8 +30,10 @@ import java.time.Duration; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -48,6 +50,7 @@ import org.apache.sling.distribution.InvalidationProcessException; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.impl.event.DistributionFailureEvent; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; @@ -78,6 +81,14 @@ * The clustered and non clustered publish instances use * cases can be supported by only running the Subscriber * agent on the leader instance. + * + * + * The BookKeeper supports the concurrent handling of packages; for that it + * keeps track of messages which are submitted to importPackages() or + * skipPackages(), as both can take longer to being processed. These in-flight messages + * are stored in the messagesBeingProcessed set; an offsets is only persisted + * if this offset is the lowest offset of all messages in that set. + * */ public class BookKeeper { public static final String STORE_TYPE_STATUS = "statuses"; @@ -107,6 +118,9 @@ public class BookKeeper { private final ImportPostProcessor importPostProcessor; private final InvalidationProcessor invalidationProcessor; private int skippedCounter = 0; + + private Set messagesBeingProcessed = new HashSet<>(); + public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer sender, Consumer logSender, @@ -152,10 +166,12 @@ public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics sub * failing. For those packages importers, we aim at processing packages at least * once, thanks to the order in which the content updates are applied. */ - public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException { + public void importPackage(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException { + long offset = message.getOffset(); log.debug("Importing distribution package {} at offset={}", pkgMsg, offset); try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time(); ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { + recordMessageProcessingStart(message); // Execute the pre-processor preProcess(pkgMsg); subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, offset, importStartTime.getTime())); @@ -165,6 +181,7 @@ public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, } storeOffset(importerResolver, offset); importerResolver.commit(); + recordMessageProcessingCompleted(message); subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength()); subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime.getTime()), TimeUnit.MILLISECONDS); @@ -184,10 +201,20 @@ public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, subscriberMetrics.clearCurrentImport(); } } + + private synchronized void recordMessageProcessingStart(MessageInfo message) { + messagesBeingProcessed.add(message); + } + + private synchronized void recordMessageProcessingCompleted(MessageInfo message) { + messagesBeingProcessed.remove(message); + } - public void invalidateCache(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException { + public void invalidateCache(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException { + long offset = message.getOffset(); log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { + recordMessageProcessingStart(message); Map props = this.buildProcessorPropertiesFromMessage(pkgMsg); long invalidationStartTime = currentTimeMillis(); @@ -201,6 +228,7 @@ public void invalidateCache(PackageMessage pkgMsg, long offset, Date createdTime storeOffset(resolver, offset); resolver.commit(); + recordMessageProcessingCompleted(message); clearPackageRetriesOnSuccess(pkgMsg); @@ -446,8 +474,27 @@ private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) log.info("Stored status {}", statusMap); } - private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException { - processedOffsets.store(resolver, KEY_OFFSET, offset); + /** + * Store the provided offset in the repository. This offset is only processed if it has the smallest + * offset of all entries of the messagesBeingImported set; that indicates that all messages with lower offsets + * have already been processed, and that it's safe now to mark this offset as the latest persisted one. + * @param offset the offset to persist + * @throws PersistenceException + */ + private synchronized void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException { + long smallestOffset = Long.MAX_VALUE; + if (messagesBeingProcessed.isEmpty()) { + smallestOffset = offset; // we have to store the offset if no other message is being processed concurrently + } else { + for (MessageInfo mi: messagesBeingProcessed) { + if (mi.getOffset() < smallestOffset) { + smallestOffset = mi.getOffset(); + } + } + } + if (smallestOffset == offset) { + processedOffsets.store(resolver, KEY_OFFSET, offset); + } } private ResourceResolver getServiceResolver(String subService) throws LoginException { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index bda2a83f..463b0699 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -37,8 +37,12 @@ import java.util.Date; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -134,6 +138,8 @@ public class DistributionSubscriber { private final Delay delay = new Delay(); private AtomicReference state = new AtomicReference(DistributionAgentState.IDLE); + private ExecutorService importExecutor; + @Activate public DistributionSubscriber( @Reference(name = "packageBuilder") DistributionPackageBuilder packageBuilder, @@ -192,15 +198,29 @@ public DistributionSubscriber( String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null; packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign, - HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); + HandlerAdapter.create(PackageMessage.class, this::delegatePackageMessageToExecutor), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class); announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper, config.maxRetries(), config.editable(), announceDelay); + + importExecutor = Executors.newFixedThreadPool(config.concurrentImporterThreads(), new ThreadFactory() { - LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset, - queueNames, config.subscriberIdleCheck()); + AtomicInteger id = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + int no = id.incrementAndGet(); + Thread t = new Thread(r); + t.setName("DistributionSubscriber-importer-" + no); + return t; + } + }); + + LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}, concurrent importer threads={}", + subAgentName, startOffset, queueNames, config.subscriberIdleCheck(), + config.concurrentImporterThreads()); } private String getFirst(String[] agentNames) { @@ -235,6 +255,18 @@ public void deactivate() { IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, idleCheck, commandPoller); running = false; + if (importExecutor != null) { + importExecutor.shutdown(); + try { + if (!importExecutor.awaitTermination(1, MINUTES)) { + importExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Caught an unexpected ThreadInterrupted exception during shutdown, interrupting thread now",e); + Thread.currentThread().interrupt(); + importExecutor.shutdownNow(); + } + } LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}", subAgentName, queueNames, pkgType); } @@ -244,6 +276,13 @@ public DistributionAgentState getState() { return (isBlocked) ? DistributionAgentState.BLOCKED : state.get(); } + /** + * Delegates the rest of the execution into a thread of the executor for async execution + */ + private void delegatePackageMessageToExecutor(MessageInfo info, PackageMessage message) { + importExecutor.submit(() -> handlePackageMessage(info, message)); + } + private void handlePackageMessage(MessageInfo info, PackageMessage message) { boolean done = false; while (!done && running) { @@ -332,9 +371,9 @@ private void processPackageMessage(MessageInfo info, PackageMessage pkgMsg) if (skip) { bookKeeper.removePackage(pkgMsg, info.getOffset()); } else if (type == INVALIDATE) { - bookKeeper.invalidateCache(pkgMsg, info.getOffset(), createdTime, importStartTime); + bookKeeper.invalidateCache(pkgMsg, info, createdTime, importStartTime); } else { - bookKeeper.importPackage(pkgMsg, info.getOffset(), createdTime, importStartTime); + bookKeeper.importPackage(pkgMsg, info, createdTime, importStartTime); } blockingSendStoredStatus(); } finally { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java index bcf2e0bf..49407982 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java @@ -70,4 +70,7 @@ @AttributeDefinition(description = "Number of ms to wait before retrying to process a package.") int acceptableAgeDiffMs() default 120 * 1000; + + @AttributeDefinition(description = "Number of threads importing packages (default 1)") + int concurrentImporterThreads() default 1; } diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index e297bc57..f072e2be 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -27,10 +27,17 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; import java.time.Duration; +import java.util.Collections; import java.util.Date; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.sling.api.resource.LoginException; @@ -44,12 +51,14 @@ import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.testing.mock.osgi.junit.OsgiContext; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -139,7 +148,7 @@ public void testOnlyEveryTenthSkippedPackageOffsetStored() throws InterruptedExc public void testPackageImport() throws DistributionException { try { Date createdTime = new Date(currentTimeMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, createdTime, createdTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), createdTime, createdTime); } finally { assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0)); } @@ -155,7 +164,7 @@ public void testPackageBlockingImportErrorMetric() throws DistributionException, for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) { try { Date createdTime = new Date(currentTimeMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, createdTime, createdTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), createdTime, createdTime); } catch (DistributionException e) { } } @@ -180,7 +189,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(6).toMillis( )); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, simulatedStartTime, simulatedStartTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), simulatedStartTime, simulatedStartTime); assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L)); } @@ -202,7 +211,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(1).toMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, new Date(currentTimeMillis()), simulatedStartTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), new Date(currentTimeMillis()), simulatedStartTime); assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L)); } @@ -211,7 +220,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { public void testCacheInvalidation() throws DistributionException { try { Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(1).toMillis()); - bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), 10L, simulatedStartTime, simulatedStartTime); + bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), buildMessageInfo(10), simulatedStartTime, simulatedStartTime); } finally { assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0)); } @@ -227,7 +236,98 @@ public void testClearOffsetHandling() { assertThat("Should be null", offset2, equalTo(newOffset)); } + /** + * Verifies that concurrent importPackage() calls only persist an offset when all messages + * with lower offsets have completed. Uses CountDownLatches to control packageHandler.apply() + * completion order without relying on timing, so the test is deterministic. + */ + @Test + public void testConcurrentImportStoresOffsetOnlyWhenAllLowerOffsetsCompleted() + throws InterruptedException, DistributionException { + final int count = 4; + final long timeoutSeconds = 2; + + // Latches to block each apply() until we release it (keyed by offset) + final CountDownLatch[] startLatches = new CountDownLatch[count]; + final CountDownLatch[] doneLatches = new CountDownLatch[count]; + for (int i = 0; i < count; i++) { + startLatches[i] = new CountDownLatch(1); + doneLatches[i] = new CountDownLatch(1); + } + final CountDownLatch allEnteredApply = new CountDownLatch(count); + + doAnswer(invocation -> { + PackageMessage pkgMsg = invocation.getArgument(1); + String pkgId = pkgMsg.getPkgId(); + int offset = Integer.parseInt(pkgId.replace("pkg-", "")); + allEnteredApply.countDown(); + startLatches[offset].await(); // only continue when the latch is counted down + return null; + }).when(packageHandler).apply(any(ResourceResolver.class), any(PackageMessage.class)); + + ExecutorService executor = Executors.newFixedThreadPool(count); + try { + for (int offset = 0; offset < count; offset++) { + final int off = offset; + executor.submit(() -> { + try { + bookKeeper.importPackage( + buildPackageMessage(PackageMessage.ReqType.ADD, (long) off), + buildMessageInfo(off), + new Date(), + new Date()); + } catch (DistributionException e) { + throw new AssertionError("importPackage failed for offset " + off, e); + } finally { + doneLatches[off].countDown(); + } + }); + } + + Assert.assertTrue("All threads should enter apply()", + allEnteredApply.await(timeoutSeconds, TimeUnit.SECONDS)); + + // No import has completed yet -> offset must still be -1 + assertThat(bookKeeper.loadOffset(), equalTo(-1L)); + + // Complete offset 2 first. Stored offset must not advance (0,1 still in flight). + startLatches[2].countDown(); + Assert.assertTrue("Offset 2 import should complete", + doneLatches[2].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(-1L)); + + // Complete offset 0. storeOffset(0) runs while set still has {0,1,3}; smallest=0 -> store 0. + startLatches[0].countDown(); + Assert.assertTrue("Offset 0 import should complete", + doneLatches[0].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(0L)); + + // Complete offset 3. Stored offset must stay 0 (offset 1 still in flight). + startLatches[3].countDown(); + Assert.assertTrue("Offset 3 import should complete", + doneLatches[3].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(0L)); + + // Complete offset 1. Now 1 is the only one left in set -> store 1. + startLatches[1].countDown(); + Assert.assertTrue("Offset 1 import should complete", + doneLatches[1].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(1L)); + } finally { + executor.shutdown(); + executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS); + } + } + PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) { + return buildPackageMessage(reqType, null); + } + + /** + * Build a package message with an optional offset encoded in pkgId for tests that need to + * identify the message (e.g. "pkg-2" for offset 2). If offset is null, a random UUID is used. + */ + PackageMessage buildPackageMessage(PackageMessage.ReqType reqType, Long offsetForPkgId) { PackageMessage msg = mock(PackageMessage.class); when(msg.getPkgLength()) .thenReturn(100L); @@ -238,8 +338,39 @@ PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) { when(msg.getPaths()) .thenReturn(singletonList("/content")); when(msg.getPkgId()) - .thenReturn(UUID.randomUUID().toString()); + .thenReturn(offsetForPkgId != null ? "pkg-" + offsetForPkgId : UUID.randomUUID().toString()); return msg; } + + MessageInfo buildMessageInfo(long offset) { + return new MessageInfo() { + + @Override + public String getTopic() { + return "testTopic"; + } + + @Override + public int getPartition() { + return 0; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public long getCreateTime() { + return System.currentTimeMillis(); + } + + @Override + public Map getProps() { + return Collections.emptyMap(); + } + + }; + } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java new file mode 100644 index 00000000..c83da8a7 --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriberConcurrentTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.subscriber; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.commons.metrics.MetricsService; +import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessor; +import org.apache.sling.distribution.journal.BinaryStore; +import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.bookkeeper.BookKeeper; +import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory; +import org.apache.sling.distribution.journal.bookkeeper.LocalStore; +import org.apache.sling.distribution.journal.impl.precondition.Precondition; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor; +import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor; +import org.apache.sling.distribution.journal.shared.OnlyOnLeader; +import org.apache.sling.distribution.journal.shared.TestMessageInfo; +import org.apache.sling.distribution.journal.shared.Topics; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageBuilder; +import org.apache.sling.settings.SlingSettingsService; +import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.osgi.framework.BundleContext; +import org.osgi.service.event.EventAdmin; +import org.osgi.util.converter.Converters; + +/** + * Tests concurrent import of packages by DistributionSubscriber with multiple importer threads. + * Uses explicit synchronization (CountDownLatch) so the test does not depend on timing. + */ +@RunWith(MockitoJUnitRunner.class) +public class DistributionSubscriberConcurrentTest { + + private static final String SUB_SLING_ID = "subsling"; + private static final String SUB_AGENT_NAME = "subagent"; + private static final String PUB_AGENT_NAME = "pubagent"; + private static final String STORE_PACKAGE_NODE_NAME = "myserver.apache.org_somepath_package"; + private static final int NUM_MESSAGES = 4; + /** Use at least 2 and enough threads so all messages can enter the handler (block in mock) at once. */ + private static final int IMPORT_THREADS = 4; + private static final long AWAIT_SECONDS = 30; + + @Mock + private BundleContext context; + @Mock + private DistributionPackageBuilder packageBuilder; + @Mock + private Precondition precondition; + @Mock + private SlingSettingsService slingSettings; + @Mock + private BinaryStore binaryStore; + @Spy + private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); + @Mock + private MessagingProvider clientProvider; + @Mock + private EventAdmin eventAdmin; + @Mock + private org.apache.sling.distribution.journal.MessageSender discoverySender; + @Mock + private org.apache.sling.distribution.journal.MessageSender statusSender; + @Spy + private MetricsService metricsService = MetricsService.NOOP; + @Spy + private ImportPreProcessor importPreProcessor = new NoOpImportPreProcessor(); + @Spy + private ImportPostProcessor importPostProcessor = new NoOpImportPostProcessor(); + @Spy + private SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore(); + @InjectMocks + private BookKeeperFactory bookKeeperFactory; + + private DistributionSubscriber subscriber; + private MessageHandler packageHandler; + + @Captor + private ArgumentCaptor> packageCaptor; + + @Before + public void before() throws URISyntaxException { + when(packageBuilder.getType()).thenReturn("journal"); + when(slingSettings.getSlingId()).thenReturn(SUB_SLING_ID); + URI serverURI = new URI("http://myserver.apache.org:1234/somepath"); + when(clientProvider.getServerUri()).thenReturn(serverURI); + when(clientProvider.createSender(Topics.STATUS_TOPIC)).thenReturn(statusSender); + when(clientProvider.createSender(Topics.DISCOVERY_TOPIC)).thenReturn(discoverySender); + } + + @After + public void after() { + if (subscriber != null) { + subscriber.deactivate(); + } + } + + /** + * Submits multiple package messages (ADD, ADD, DELETE, ADD) with at least 2 importer threads. + * Uses latches so that: (1) we know when all messages have entered the handler, + * (2) we release them in a chosen order (2, 0, 3, 1) to validate that the stored offset + * only advances when all lower offsets have completed (no timing assumptions). + */ + @Test + public void testConcurrentImportMultiplePackageTypes() throws Exception { + assumePreconditionAccept(); + initSubscriberWithConcurrentThreads(IMPORT_THREADS); + + CountDownLatch allEntered = new CountDownLatch(NUM_MESSAGES); + CountDownLatch[] startLatches = new CountDownLatch[NUM_MESSAGES]; + CountDownLatch[] doneLatches = new CountDownLatch[NUM_MESSAGES]; + for (int i = 0; i < NUM_MESSAGES; i++) { + startLatches[i] = new CountDownLatch(1); + doneLatches[i] = new CountDownLatch(1); + } + + doAnswer((Answer) invocation -> { + DistributionPackage pkg = invocation.getArgument(1); + String id = pkg.getId(); + int idx = Integer.parseInt(id.replace("pkg-", "")); + allEntered.countDown(); + startLatches[idx].await(); + doneLatches[idx].countDown(); + return null; + }).when(packageBuilder).installPackage(any(ResourceResolver.class), any(DistributionPackage.class)); + + PackageMessage add0 = packageMessage("pkg-0", ReqType.ADD); + PackageMessage add1 = packageMessage("pkg-1", ReqType.ADD); + PackageMessage del2 = packageMessage("pkg-2", ReqType.DELETE); + PackageMessage add3 = packageMessage("pkg-3", ReqType.ADD); + + MessageInfo info0 = createInfo(0); + MessageInfo info1 = createInfo(1); + MessageInfo info2 = createInfo(2); + MessageInfo info3 = createInfo(3); + + // packageHandler is the handler the subscriber registered with the poller; calling it + // invokes the subscriber's delegatePackageMessageToExecutor -> import executor -> handlePackageMessage path + packageHandler.handle(info0, add0); + packageHandler.handle(info1, add1); + packageHandler.handle(info2, del2); + packageHandler.handle(info3, add3); + + assertThat("All messages should enter installPackage", allEntered.await(AWAIT_SECONDS, SECONDS)); + + assertThat(getStoredOffset(), equalTo(-1L)); + + startLatches[2].countDown(); + assertThat("Offset 2 done", doneLatches[2].await(AWAIT_SECONDS, SECONDS)); + assertThat("Offset 2 completed first; stored offset must not advance (0,1 still in flight)", getStoredOffset(), equalTo(-1L)); + + startLatches[0].countDown(); + assertThat("Offset 0 done", doneLatches[0].await(AWAIT_SECONDS, SECONDS)); + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> getStoredOffset() == 0L); + + startLatches[3].countDown(); + assertThat("Offset 3 done", doneLatches[3].await(AWAIT_SECONDS, SECONDS)); + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> getStoredOffset() == 0L); + + startLatches[1].countDown(); + assertThat("Offset 1 done", doneLatches[1].await(AWAIT_SECONDS, SECONDS)); + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> getStoredOffset() == 1L); + + waitSubscriberIdle(); + assertThat("Subscriber should be IDLE after all concurrent imports complete", subscriber.getState(), equalTo(IDLE)); + + verify(packageBuilder, times(NUM_MESSAGES)).installPackage(any(ResourceResolver.class), any(DistributionPackage.class)); + } + + private void assumePreconditionAccept() { + try { + when(precondition.canProcess(Mockito.eq(SUB_AGENT_NAME), Mockito.anyLong())).thenReturn(Decision.ACCEPT); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initSubscriberWithConcurrentThreads(int threads) { + Map props = new HashMap<>(); + props.put("name", SUB_AGENT_NAME); + props.put("agentNames", PUB_AGENT_NAME); + props.put("idleMillies", 1000); + props.put("subscriberIdleCheck", false); // avoid SubscriberIdleCheck registration with context (can block on CI) + props.put("concurrentImporterThreads", threads); + + SubscriberConfiguration config = Converters.standardConverter().convert(props).to(SubscriberConfiguration.class); + OnlyOnLeader onlyOnLeader = new OnlyOnLeader(context); + + subscriber = new DistributionSubscriber( + packageBuilder, + slingSettings, + clientProvider, + precondition, + metricsService, + bookKeeperFactory, + subscriberReadyStore, + onlyOnLeader, + config, + context, + props); + + verify(clientProvider).createPoller( + Mockito.eq(Topics.PACKAGE_TOPIC), + Mockito.eq(Reset.latest), + Mockito.nullable(String.class), + packageCaptor.capture(), + Mockito.any()); + packageHandler = packageCaptor.getValue().getHandler(); + } + + private void waitSubscriberIdle() { + await().atMost(AWAIT_SECONDS, SECONDS).until(() -> subscriber.getState() == IDLE); + } + + private static PackageMessage packageMessage(String pkgId, ReqType reqType) { + return PackageMessage.builder() + .pkgId(pkgId) + .pubAgentName(PUB_AGENT_NAME) + .reqType(reqType) + .pkgType("journal") + .paths(Arrays.asList("/test")) + .pkgBinary(new byte[1]) + .build(); + } + + private static MessageInfo createInfo(long offset) { + return new TestMessageInfo("", 0, offset, System.currentTimeMillis()); + } + + private long getStoredOffset() { + LocalStore store = new LocalStore(resolverFactory, STORE_PACKAGE_NODE_NAME, SUB_AGENT_NAME); + Long value = store.load(BookKeeper.KEY_OFFSET, Long.class); + return value != null ? value : -1L; + } +} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 1bdbe19c..93d221ba 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -22,7 +22,6 @@ import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE; import static org.apache.sling.distribution.event.DistributionEventProperties.*; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -232,7 +231,7 @@ public void before() throws URISyntaxException { } @After - public void after() throws IOException { + public void after() { subscriber.deactivate(); //verify(poller, atLeastOnce()).close(); } @@ -242,17 +241,17 @@ public void testReceiveNotSubscribed() throws DistributionException { assumeNoPrecondition(); initSubscriber(Collections.singletonMap("agentNames", "dummy")); assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE)); - + MessageInfo info = createInfo(100); PackageMessage message = BASIC_ADD_PACKAGE; - packageHandler.handle(info, message); - - verify(packageBuilder, timeout(1000).times(0)).installPackage(any(ResourceResolver.class), - any(DistributionPackage.class)); - assertThat(getStoredOffset(), nullValue()); - for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) { + + for (int c = 0; c <= BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) { packageHandler.handle(info, message); } + await().atMost(30, SECONDS).until(() -> getStoredOffset() != null && getStoredOffset().longValue() == 100L); + + verify(packageBuilder, times(0)).installPackage(any(ResourceResolver.class), + any(DistributionPackage.class)); assertThat(getStoredOffset(), equalTo(100l)); } @@ -264,9 +263,8 @@ public void testReceive() throws DistributionException { MessageInfo info = createInfo(0l); PackageMessage message = BASIC_ADD_PACKAGE; - packageHandler.handle(info, message); - + waitSubscriber(IDLE); verifyNoStatusMessageSent(); } @@ -277,18 +275,18 @@ public void testImportPreAndPostProcessInvoked() throws DistributionException, I assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE)); MessageInfo info = createInfo(0l); PackageMessage message = BASIC_ADD_PACKAGE; + packageHandler.handle(info, message); - verifyNoStatusMessageSent(); - Map props = new HashMap<>(); props.put(DISTRIBUTION_TYPE, message.getReqType().name()); props.put(DISTRIBUTION_PATHS, message.getPaths()); props.put(DISTRIBUTION_PACKAGE_ID, message.getPkgId()); props.put(DISTRIBUTION_COMPONENT_NAME, message.getPubAgentName()); - verify(importPreProcessor, times(1)).process(props); - verify(importPostProcessor, times(1)).process(props); + verify(importPreProcessor, timeout(10000).times(1)).process(props); + verify(importPostProcessor, timeout(10000).times(1)).process(props); + verifyNoStatusMessageSent(); } @Test @@ -341,12 +339,11 @@ public void testSendFailedStatus() throws DistributionException { MessageInfo info = createInfo(0l); PackageMessage message = BASIC_ADD_PACKAGE; packageHandler.handle(info, message); - verifyStatusMessageSentWithStatus(Status.REMOVED_FAILED); } @Test - public void testSendSuccessStatus() throws DistributionException, InterruptedException { + public void testSendSuccessStatus() { assumeNoPrecondition(); // Only editable subscriber will send status initSubscriber(Collections.singletonMap("editable", "true")); @@ -359,7 +356,7 @@ public void testSendSuccessStatus() throws DistributionException, InterruptedExc } @Test - public void testSkipBecauseOfPrecondition() throws DistributionException, InterruptedException, TimeoutException { + public void testSkipBecauseOfPrecondition() { when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.SKIP); initSubscriber(Collections.singletonMap("editable", "true")); @@ -367,7 +364,7 @@ public void testSkipBecauseOfPrecondition() throws DistributionException, Interr PackageMessage message = BASIC_ADD_PACKAGE; packageHandler.handle(info, message); - await().until(this::getStoredStatus, equalTo(PackageStatusMessage.Status.REMOVED)); + await().atMost(10, SECONDS).until(this::getStoredStatus, equalTo(PackageStatusMessage.Status.REMOVED)); verifyStatusMessageSentWithStatus(Status.REMOVED); }