Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,20 @@ CompositeData startDataStoreGC(@Name("markOnly")
@Description("Refresh all currently open sessions")
TabularData refreshAllSessions();

/**
* Get the Session.save() delay configuration.
*
* @return the configuration
*/
@Description("The Session.save() delay configuration")
String getSessionSaveDelayerConfig();

/**
* Set the Session.save() delay configuration.
*
* @param config the new configuration
*/
@Description("The Session.save() delay configuration")
void setSessionSaveDelayerConfig(String config);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

@Version("4.14.0")
@Version("4.15.0")
package org.apache.jackrabbit.oak.api.jmx;

import org.osgi.annotation.versioning.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
*/
public class RepositoryManager extends AnnotatedStandardMBean implements RepositoryManagementMBean {
private final Whiteboard whiteboard;
private String sessionSaveConfig;

public RepositoryManager(@NotNull Whiteboard whiteboard) {
super(RepositoryManagementMBean.class);
Expand Down Expand Up @@ -277,4 +278,14 @@ public Status apply(SessionMBean sessionMBean) {
}
}));
}

@Override
public String getSessionSaveDelayerConfig() {
return sessionSaveConfig;
}

@Override
public void setSessionSaveDelayerConfig(String config) {
this.sessionSaveConfig = config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Composite;
import org.apache.jackrabbit.oak.jcr.session.SessionNamespaces;
import org.apache.jackrabbit.oak.jcr.session.SessionSaveDelayer;
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
import org.apache.jackrabbit.oak.jcr.session.SessionStats.Counters;
import org.apache.jackrabbit.oak.jcr.session.operation.SessionOperation;
Expand Down Expand Up @@ -138,6 +139,8 @@ public class SessionDelegate {

private final SessionNamespaces namespaces;

private final SessionSaveDelayer sessionSaveDelayer;

/**
* Create a new session delegate for a {@code ContentSession}. The refresh behaviour of the
* session is governed by the value of the {@code refreshInterval} argument: if the session
Expand All @@ -150,14 +153,16 @@ public class SessionDelegate {
* @param securityProvider the security provider
* @param refreshStrategy the refresh strategy used for auto refreshing this session
* @param statisticManager the statistics manager for tracking session operations
* @param sessionSaveDelayer the session save delay mechanism
*/
public SessionDelegate(
@NotNull ContentSession contentSession,
@NotNull SecurityProvider securityProvider,
@NotNull RefreshStrategy refreshStrategy,
@NotNull ThreadLocal<Long> threadSaveCount,
@NotNull StatisticManager statisticManager,
@NotNull Clock clock) {
@NotNull Clock clock,
@NotNull SessionSaveDelayer sessionSaveDelayer) {
this.contentSession = requireNonNull(contentSession);
this.securityProvider = requireNonNull(securityProvider);
this.root = contentSession.getLatestRoot();
Expand All @@ -176,6 +181,7 @@ refreshAtNextAccess, saveCountRefresh, new RefreshNamespaces(
readDuration = statisticManager.getTimer(SESSION_READ_DURATION);
writeCounter = statisticManager.getMeter(SESSION_WRITE_COUNTER);
writeDuration = statisticManager.getTimer(SESSION_WRITE_DURATION);
this.sessionSaveDelayer = sessionSaveDelayer;
}

@NotNull
Expand Down Expand Up @@ -392,6 +398,7 @@ private void commit(Root root, String path) throws CommitFailedException {
if (userData != null) {
info.put(EventFactory.USER_DATA, userData);
}
sessionSaveDelayer.delayIfNeeded(userData);
root.commit(Collections.unmodifiableMap(info));
if (permissionProvider != null && refreshPermissionProvider) {
permissionProvider.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Composite;
import org.apache.jackrabbit.oak.jcr.session.SessionContext;
import org.apache.jackrabbit.oak.jcr.session.SessionSaveDelayer;
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
import org.apache.jackrabbit.oak.jcr.version.FrozenNodeLogger;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class RepositoryImpl implements JackrabbitRepository {
private final MountInfoProvider mountInfoProvider;
private final BlobAccessProvider blobAccessProvider;
private final SessionQuerySettingsProvider sessionQuerySettingsProvider;
private final SessionSaveDelayer sessionSaveDelayer;

/**
* {@link ThreadLocal} counter that keeps track of the save operations
Expand Down Expand Up @@ -176,6 +178,7 @@ public RepositoryImpl(@NotNull ContentRepository contentRepository,
this.frozenNodeLogger = new FrozenNodeLogger(clock, whiteboard);
this.sessionQuerySettingsProvider = Optional.ofNullable(WhiteboardUtils.getService(whiteboard, SessionQuerySettingsProvider.class))
.orElseGet(() -> new FastQuerySizeSettingsProvider(fastQueryResultSize));
this.sessionSaveDelayer = new SessionSaveDelayer(whiteboard);
}

//---------------------------------------------------------< Repository >---
Expand Down Expand Up @@ -324,7 +327,7 @@ private SessionDelegate createSessionDelegate(

return new SessionDelegate(
contentSession, securityProvider, refreshStrategy,
threadSaveCount, statisticManager, clock) {
threadSaveCount, statisticManager, clock, sessionSaveDelayer) {

// Defer session MBean registration to avoid cluttering the
// JMX name space with short lived sessions
Expand Down Expand Up @@ -354,6 +357,7 @@ public void shutdown() {
statisticManager.dispose();
gcMonitorRegistration.unregister();
frozenNodeLogger.close();
sessionSaveDelayer.close();
clock.close();
new ExecutorCloser(scheduledExecutor).close();
if (contentRepository instanceof Closeable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.jcr.session;

import static org.apache.jackrabbit.oak.spi.toggle.Feature.newFeature;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jackrabbit.guava.common.base.Strings;
import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
import org.apache.jackrabbit.oak.spi.toggle.Feature;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A delay mechanism for Session.save() operations. By default, Session.save
* calls are not delayed. If enabled, some of the save() operations can be
* delayed for a certain number of microseconds.
*
* This facility is enabled / disabled via feature toggle, and controlled via
* JMX bean, or (for testing) via two system properties. There is no attempt to
* control the delay, or which threads to delay, from within. It is meant for
* emergency situation, specially for cases where some threads write too much.
*/
public class SessionSaveDelayer implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(SessionSaveDelayer.class);

private static final String FT_SAVE_DELAY_NAME = "FT_SAVE_DELAY_OAK-11766";
private static final String ENABLED_PROP_NAME = "oak.sessionSaveDelayer";
private static final String CONFIG_PROP_NAME = "oak.sessionSaveDelayerConfig";

private final boolean enabledViaSysPropertey = Boolean.getBoolean(ENABLED_PROP_NAME);
private final String sysPropertyConfig = System.getProperty(CONFIG_PROP_NAME, "");
private final Feature feature;
private final Whiteboard whiteboard;
private final AtomicBoolean closed = new AtomicBoolean();

private RepositoryManagementMBean cachedMbean;
private String lastConfigJson;
private SessionSaveDelayerConfig lastConfig;
private volatile boolean logNextDelay;

public SessionSaveDelayer(@NotNull Whiteboard whiteboard) {
this.feature = newFeature(FT_SAVE_DELAY_NAME, whiteboard);
LOG.info("Initialized");
if (enabledViaSysPropertey) {
LOG.info("Enabled via system property: " + ENABLED_PROP_NAME);
}
this.whiteboard = whiteboard;
}

private RepositoryManagementMBean getRepositoryMBean() {
if (cachedMbean == null) {
cachedMbean = WhiteboardUtils.getService(whiteboard, RepositoryManagementMBean.class);
}
return cachedMbean;
}

public long delayIfNeeded(String userData) {
if (closed.get() || (!feature.isEnabled() && !enabledViaSysPropertey)) {
return 0;
}
String config = sysPropertyConfig;
RepositoryManagementMBean mbean = getRepositoryMBean();
if (mbean != null) {
String jmxConfig = mbean.getSessionSaveDelayerConfig();
if (!Strings.isNullOrEmpty(jmxConfig)) {
config = jmxConfig;
}
}
if (Strings.isNullOrEmpty(config)) {
return 0;
}
if (!config.equals(lastConfigJson)) {
logNextDelay = true;
lastConfigJson = config;
try {
// reset, if already set
lastConfig = null;
lastConfig = SessionSaveDelayerConfig.fromJson(config);
LOG.info("New config: {}", lastConfig.toString());
} catch (IllegalArgumentException e) {
LOG.warn("Can not parse config {}", e);
// don't delay
return 0;
}
}
if (lastConfig == null) {
return 0;
}
String threadName = Thread.currentThread().getName();
long delayNanos = lastConfig.getDelayNanos(threadName, userData, null);
if (delayNanos > 0) {
long millis = delayNanos / 1_000_000;
int nanos = (int) (delayNanos % 1_000_000);
if (logNextDelay) {
LOG.info("Sleep {} ms {} ns for user {}", millis, nanos, userData);
logNextDelay = false;
}
try {
Thread.sleep(millis, nanos);
} catch (InterruptedException e) {
// ignore
Thread.currentThread().interrupt();
}
}
return delayNanos;
}

@Override
public void close() {
if (!closed.getAndSet(true)) {
feature.close();
}
}

}
Loading
Loading