Skip to content

Commit 963dc64

Browse files
committed
Reapply "OAK-11766 Write Throttling Mechanism - Session.save() delay (#2339)"
This reverts commit 7c96ca5.
1 parent 8b39898 commit 963dc64

File tree

10 files changed

+1560
-4
lines changed

10 files changed

+1560
-4
lines changed

oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryManagementMBean.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,20 @@ CompositeData startDataStoreGC(@Name("markOnly")
263263
@Description("Refresh all currently open sessions")
264264
TabularData refreshAllSessions();
265265

266+
/**
267+
* Get the Session.save() delay configuration.
268+
*
269+
* @return the configuration
270+
*/
271+
@Description("The Session.save() delay configuration")
272+
String getSessionSaveDelayerConfig();
273+
274+
/**
275+
* Set the Session.save() delay configuration.
276+
*
277+
* @param config the new configuration
278+
*/
279+
@Description("The Session.save() delay configuration")
280+
void setSessionSaveDelayerConfig(String config);
281+
266282
}

oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
@Version("4.14.0")
18+
@Version("4.15.0")
1919
package org.apache.jackrabbit.oak.api.jmx;
2020

2121
import org.osgi.annotation.versioning.Version;

oak-core/src/main/java/org/apache/jackrabbit/oak/management/RepositoryManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
*/
5757
public class RepositoryManager extends AnnotatedStandardMBean implements RepositoryManagementMBean {
5858
private final Whiteboard whiteboard;
59+
private String sessionSaveConfig;
5960

6061
public RepositoryManager(@NotNull Whiteboard whiteboard) {
6162
super(RepositoryManagementMBean.class);
@@ -277,4 +278,14 @@ public Status apply(SessionMBean sessionMBean) {
277278
}
278279
}));
279280
}
281+
282+
@Override
283+
public String getSessionSaveDelayerConfig() {
284+
return sessionSaveConfig;
285+
}
286+
287+
@Override
288+
public void setSessionSaveDelayerConfig(String config) {
289+
this.sessionSaveConfig = config;
290+
}
280291
}

oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
5555
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Composite;
5656
import org.apache.jackrabbit.oak.jcr.session.SessionNamespaces;
57+
import org.apache.jackrabbit.oak.jcr.session.SessionSaveDelayer;
5758
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
5859
import org.apache.jackrabbit.oak.jcr.session.SessionStats.Counters;
5960
import org.apache.jackrabbit.oak.jcr.session.operation.SessionOperation;
@@ -138,6 +139,8 @@ public class SessionDelegate {
138139

139140
private final SessionNamespaces namespaces;
140141

142+
private final SessionSaveDelayer sessionSaveDelayer;
143+
141144
/**
142145
* Create a new session delegate for a {@code ContentSession}. The refresh behaviour of the
143146
* session is governed by the value of the {@code refreshInterval} argument: if the session
@@ -150,14 +153,16 @@ public class SessionDelegate {
150153
* @param securityProvider the security provider
151154
* @param refreshStrategy the refresh strategy used for auto refreshing this session
152155
* @param statisticManager the statistics manager for tracking session operations
156+
* @param sessionSaveDelayer the session save delay mechanism
153157
*/
154158
public SessionDelegate(
155159
@NotNull ContentSession contentSession,
156160
@NotNull SecurityProvider securityProvider,
157161
@NotNull RefreshStrategy refreshStrategy,
158162
@NotNull ThreadLocal<Long> threadSaveCount,
159163
@NotNull StatisticManager statisticManager,
160-
@NotNull Clock clock) {
164+
@NotNull Clock clock,
165+
@NotNull SessionSaveDelayer sessionSaveDelayer) {
161166
this.contentSession = requireNonNull(contentSession);
162167
this.securityProvider = requireNonNull(securityProvider);
163168
this.root = contentSession.getLatestRoot();
@@ -176,6 +181,7 @@ refreshAtNextAccess, saveCountRefresh, new RefreshNamespaces(
176181
readDuration = statisticManager.getTimer(SESSION_READ_DURATION);
177182
writeCounter = statisticManager.getMeter(SESSION_WRITE_COUNTER);
178183
writeDuration = statisticManager.getTimer(SESSION_WRITE_DURATION);
184+
this.sessionSaveDelayer = sessionSaveDelayer;
179185
}
180186

181187
@NotNull
@@ -392,6 +398,7 @@ private void commit(Root root, String path) throws CommitFailedException {
392398
if (userData != null) {
393399
info.put(EventFactory.USER_DATA, userData);
394400
}
401+
sessionSaveDelayer.delayIfNeeded(userData);
395402
root.commit(Collections.unmodifiableMap(info));
396403
if (permissionProvider != null && refreshPermissionProvider) {
397404
permissionProvider.refresh();

oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
5555
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Composite;
5656
import org.apache.jackrabbit.oak.jcr.session.SessionContext;
57+
import org.apache.jackrabbit.oak.jcr.session.SessionSaveDelayer;
5758
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
5859
import org.apache.jackrabbit.oak.jcr.version.FrozenNodeLogger;
5960
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
@@ -120,6 +121,7 @@ public class RepositoryImpl implements JackrabbitRepository {
120121
private final MountInfoProvider mountInfoProvider;
121122
private final BlobAccessProvider blobAccessProvider;
122123
private final SessionQuerySettingsProvider sessionQuerySettingsProvider;
124+
private final SessionSaveDelayer sessionSaveDelayer;
123125

124126
/**
125127
* {@link ThreadLocal} counter that keeps track of the save operations
@@ -176,6 +178,7 @@ public RepositoryImpl(@NotNull ContentRepository contentRepository,
176178
this.frozenNodeLogger = new FrozenNodeLogger(clock, whiteboard);
177179
this.sessionQuerySettingsProvider = Optional.ofNullable(WhiteboardUtils.getService(whiteboard, SessionQuerySettingsProvider.class))
178180
.orElseGet(() -> new FastQuerySizeSettingsProvider(fastQueryResultSize));
181+
this.sessionSaveDelayer = new SessionSaveDelayer(whiteboard);
179182
}
180183

181184
//---------------------------------------------------------< Repository >---
@@ -324,7 +327,7 @@ private SessionDelegate createSessionDelegate(
324327

325328
return new SessionDelegate(
326329
contentSession, securityProvider, refreshStrategy,
327-
threadSaveCount, statisticManager, clock) {
330+
threadSaveCount, statisticManager, clock, sessionSaveDelayer) {
328331

329332
// Defer session MBean registration to avoid cluttering the
330333
// JMX name space with short lived sessions
@@ -354,6 +357,7 @@ public void shutdown() {
354357
statisticManager.dispose();
355358
gcMonitorRegistration.unregister();
356359
frozenNodeLogger.close();
360+
sessionSaveDelayer.close();
357361
clock.close();
358362
new ExecutorCloser(scheduledExecutor).close();
359363
if (contentRepository instanceof Closeable) {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.jackrabbit.oak.jcr.session;
18+
19+
import static org.apache.jackrabbit.oak.spi.toggle.Feature.newFeature;
20+
21+
import java.io.Closeable;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.apache.jackrabbit.guava.common.base.Strings;
25+
import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
26+
import org.apache.jackrabbit.oak.spi.toggle.Feature;
27+
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
28+
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
29+
import org.jetbrains.annotations.NotNull;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* A delay mechanism for Session.save() operations. By default, Session.save
35+
* calls are not delayed. If enabled, some of the save() operations can be
36+
* delayed for a certain number of microseconds.
37+
*
38+
* This facility is enabled / disabled via feature toggle, and controlled via
39+
* JMX bean, or (for testing) via two system properties. There is no attempt to
40+
* control the delay, or which threads to delay, from within. It is meant for
41+
* emergency situation, specially for cases where some threads write too much.
42+
*/
43+
public class SessionSaveDelayer implements Closeable {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(SessionSaveDelayer.class);
46+
47+
private static final String FT_SAVE_DELAY_NAME = "FT_SAVE_DELAY_OAK-11766";
48+
private static final String ENABLED_PROP_NAME = "oak.sessionSaveDelayer";
49+
private static final String CONFIG_PROP_NAME = "oak.sessionSaveDelayerConfig";
50+
51+
private final boolean enabledViaSysPropertey = Boolean.getBoolean(ENABLED_PROP_NAME);
52+
private final String sysPropertyConfig = System.getProperty(CONFIG_PROP_NAME, "");
53+
private final Feature feature;
54+
private final Whiteboard whiteboard;
55+
private final AtomicBoolean closed = new AtomicBoolean();
56+
57+
private RepositoryManagementMBean cachedMbean;
58+
private String lastConfigJson;
59+
private SessionSaveDelayerConfig lastConfig;
60+
private volatile boolean logNextDelay;
61+
62+
public SessionSaveDelayer(@NotNull Whiteboard whiteboard) {
63+
this.feature = newFeature(FT_SAVE_DELAY_NAME, whiteboard);
64+
LOG.info("Initialized");
65+
if (enabledViaSysPropertey) {
66+
LOG.info("Enabled via system property: " + ENABLED_PROP_NAME);
67+
}
68+
this.whiteboard = whiteboard;
69+
}
70+
71+
private RepositoryManagementMBean getRepositoryMBean() {
72+
if (cachedMbean == null) {
73+
cachedMbean = WhiteboardUtils.getService(whiteboard, RepositoryManagementMBean.class);
74+
}
75+
return cachedMbean;
76+
}
77+
78+
public long delayIfNeeded(String userData) {
79+
if (closed.get() || (!feature.isEnabled() && !enabledViaSysPropertey)) {
80+
return 0;
81+
}
82+
String config = sysPropertyConfig;
83+
RepositoryManagementMBean mbean = getRepositoryMBean();
84+
if (mbean != null) {
85+
String jmxConfig = mbean.getSessionSaveDelayerConfig();
86+
if (!Strings.isNullOrEmpty(jmxConfig)) {
87+
config = jmxConfig;
88+
}
89+
}
90+
if (Strings.isNullOrEmpty(config)) {
91+
return 0;
92+
}
93+
if (!config.equals(lastConfigJson)) {
94+
logNextDelay = true;
95+
lastConfigJson = config;
96+
try {
97+
// reset, if already set
98+
lastConfig = null;
99+
lastConfig = SessionSaveDelayerConfig.fromJson(config);
100+
LOG.info("New config: {}", lastConfig.toString());
101+
} catch (IllegalArgumentException e) {
102+
LOG.warn("Can not parse config {}", e);
103+
// don't delay
104+
return 0;
105+
}
106+
}
107+
if (lastConfig == null) {
108+
return 0;
109+
}
110+
String threadName = Thread.currentThread().getName();
111+
long delayNanos = lastConfig.getDelayNanos(threadName, userData, null);
112+
if (delayNanos > 0) {
113+
long millis = delayNanos / 1_000_000;
114+
int nanos = (int) (delayNanos % 1_000_000);
115+
if (logNextDelay) {
116+
LOG.info("Sleep {} ms {} ns for user {}", millis, nanos, userData);
117+
logNextDelay = false;
118+
}
119+
try {
120+
Thread.sleep(millis, nanos);
121+
} catch (InterruptedException e) {
122+
// ignore
123+
Thread.currentThread().interrupt();
124+
}
125+
}
126+
return delayNanos;
127+
}
128+
129+
@Override
130+
public void close() {
131+
if (!closed.getAndSet(true)) {
132+
feature.close();
133+
}
134+
}
135+
136+
}

0 commit comments

Comments
 (0)