Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -48,6 +50,7 @@
import org.apache.sling.distribution.InvalidationProcessException;
import org.apache.sling.distribution.InvalidationProcessor;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.impl.event.DistributionFailureEvent;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
Expand Down Expand Up @@ -78,6 +81,14 @@
* The clustered and non clustered publish instances use
* cases can be supported by only running the Subscriber
* agent on the leader instance.
*
*
* The BookKeeper supports the concurrent handling of packages; for that it
* keeps track of messages which are submitted to importPackages() or
* skipPackages(), as both can take longer to being processed. These in-flight messages
* are stored in the messagesBeingProcessed set; an offsets is only persisted
* if this offset is the lowest offset of all messages in that set.
*
*/
public class BookKeeper {
public static final String STORE_TYPE_STATUS = "statuses";
Expand Down Expand Up @@ -107,6 +118,9 @@ public class BookKeeper {
private final ImportPostProcessor importPostProcessor;
private final InvalidationProcessor invalidationProcessor;
private int skippedCounter = 0;

private Set<MessageInfo> messagesBeingProcessed = new HashSet<>();


public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics,
PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
Expand Down Expand Up @@ -152,10 +166,12 @@ public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics sub
* failing. For those packages importers, we aim at processing packages at least
* once, thanks to the order in which the content updates are applied.
*/
public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException {
public void importPackage(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException {
long offset = message.getOffset();
log.debug("Importing distribution package {} at offset={}", pkgMsg, offset);
try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
recordMessageProcessingStart(message);
// Execute the pre-processor
preProcess(pkgMsg);
subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, offset, importStartTime.getTime()));
Expand All @@ -165,6 +181,7 @@ public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime,
}
storeOffset(importerResolver, offset);
importerResolver.commit();
recordMessageProcessingCompleted(message);
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime.getTime()), TimeUnit.MILLISECONDS);

Expand All @@ -184,10 +201,20 @@ public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime,
subscriberMetrics.clearCurrentImport();
}
}

private synchronized void recordMessageProcessingStart(MessageInfo message) {
messagesBeingProcessed.add(message);
}

private synchronized void recordMessageProcessingCompleted(MessageInfo message) {
messagesBeingProcessed.remove(message);
}

public void invalidateCache(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException {
public void invalidateCache(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException {
long offset = message.getOffset();
log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset);
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
recordMessageProcessingStart(message);
Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg);

long invalidationStartTime = currentTimeMillis();
Expand All @@ -201,6 +228,7 @@ public void invalidateCache(PackageMessage pkgMsg, long offset, Date createdTime

storeOffset(resolver, offset);
resolver.commit();
recordMessageProcessingCompleted(message);

clearPackageRetriesOnSuccess(pkgMsg);

Expand Down Expand Up @@ -446,8 +474,27 @@ private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus)
log.info("Stored status {}", statusMap);
}

private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
processedOffsets.store(resolver, KEY_OFFSET, offset);
/**
* Store the provided offset in the repository. This offset is only processed if it has the smallest
* offset of all entries of the messagesBeingImported set; that indicates that all messages with lower offsets
* have already been processed, and that it's safe now to mark this offset as the latest persisted one.
* @param offset the offset to persist
* @throws PersistenceException
*/
private synchronized void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
long smallestOffset = Long.MAX_VALUE;
if (messagesBeingProcessed.isEmpty()) {
smallestOffset = offset; // we have to store the offset if no other message is being processed concurrently
} else {
for (MessageInfo mi: messagesBeingProcessed) {
if (mi.getOffset() < smallestOffset) {
smallestOffset = mi.getOffset();
}
}
}
if (smallestOffset == offset) {
processedOffsets.store(resolver, KEY_OFFSET, offset);
}
}

private ResourceResolver getServiceResolver(String subService) throws LoginException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -134,6 +138,8 @@ public class DistributionSubscriber {
private final Delay delay = new Delay();
private AtomicReference<DistributionAgentState> state = new AtomicReference<DistributionAgentState>(DistributionAgentState.IDLE);

private ExecutorService importExecutor;

@Activate
public DistributionSubscriber(
@Reference(name = "packageBuilder") DistributionPackageBuilder packageBuilder,
Expand Down Expand Up @@ -192,15 +198,29 @@ public DistributionSubscriber(
String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;

packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign,
HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));
HandlerAdapter.create(PackageMessage.class, this::delegatePackageMessageToExecutor), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));

int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
announcer = new Announcer(subSlingId, subAgentName, queueNames,
messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper,
config.maxRetries(), config.editable(), announceDelay);

importExecutor = Executors.newFixedThreadPool(config.concurrentImporterThreads(), new ThreadFactory() {

LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset,
queueNames, config.subscriberIdleCheck());
AtomicInteger id = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
int no = id.incrementAndGet();
Thread t = new Thread(r);
t.setName("DistributionSubscriber-importer-" + no);
return t;
}
});

LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}, concurrent importer threads={}",
subAgentName, startOffset, queueNames, config.subscriberIdleCheck(),
config.concurrentImporterThreads());
}

private String getFirst(String[] agentNames) {
Expand Down Expand Up @@ -235,6 +255,18 @@ public void deactivate() {

IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, idleCheck, commandPoller);
running = false;
if (importExecutor != null) {
importExecutor.shutdown();
try {
if (!importExecutor.awaitTermination(1, MINUTES)) {
importExecutor.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Caught an unexpected ThreadInterrupted exception during shutdown, interrupting thread now",e);
Thread.currentThread().interrupt();
importExecutor.shutdownNow();
}
}
LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}",
subAgentName, queueNames, pkgType);
}
Expand All @@ -244,6 +276,13 @@ public DistributionAgentState getState() {
return (isBlocked) ? DistributionAgentState.BLOCKED : state.get();
}

/**
* Delegates the rest of the execution into a thread of the executor for async execution
*/
private void delegatePackageMessageToExecutor(MessageInfo info, PackageMessage message) {
importExecutor.submit(() -> handlePackageMessage(info, message));
}

private void handlePackageMessage(MessageInfo info, PackageMessage message) {
boolean done = false;
while (!done && running) {
Expand Down Expand Up @@ -332,9 +371,9 @@ private void processPackageMessage(MessageInfo info, PackageMessage pkgMsg)
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
} else if (type == INVALIDATE) {
bookKeeper.invalidateCache(pkgMsg, info.getOffset(), createdTime, importStartTime);
bookKeeper.invalidateCache(pkgMsg, info, createdTime, importStartTime);
} else {
bookKeeper.importPackage(pkgMsg, info.getOffset(), createdTime, importStartTime);
bookKeeper.importPackage(pkgMsg, info, createdTime, importStartTime);
}
blockingSendStoredStatus();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@

@AttributeDefinition(description = "Number of ms to wait before retrying to process a package.")
int acceptableAgeDiffMs() default 120 * 1000;

@AttributeDefinition(description = "Number of threads importing packages (default 1)")
int concurrentImporterThreads() default 1;
}
Loading