Skip to content

Commit 09c4fc0

Browse files
authored
Merge branch 'master' into fix-url-normalization
2 parents 358d037 + 9babe49 commit 09c4fc0

File tree

3 files changed

+166
-12
lines changed

3 files changed

+166
-12
lines changed

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,30 +101,56 @@ public void start(String name) {
101101
if (this.executor == null) {
102102
return;
103103
}
104+
104105
LOG.info("Starting {} workers[{}] with queue size {}...",
105106
this.workers, name, this.queueSize);
107+
106108
for (int i = 0; i < this.workers; i++) {
107-
this.runningFutures.add(
108-
this.executor.submit(new ContextCallable<>(this::runAndDone)));
109+
// capture submission thread context HERE
110+
ContextCallable<Void> task = new ContextCallable<>(this::runAndDone);
111+
112+
// wrapper ensures latch always decremented even if ContextCallable fails
113+
this.runningFutures.add(this.executor.submit(() -> this.safeRun(task)));
114+
}
115+
}
116+
117+
private Void safeRun(ContextCallable<Void> task) {
118+
try {
119+
return task.call(); // may fail before/after runAndDone()
120+
} catch (Exception e) {
121+
// This exception is from ContextCallable wrapper (setContext/resetContext/delegate dispatch),
122+
// not from runAndDone() business logic (that one is handled inside runAndDone()).
123+
if (this.exception == null) {
124+
this.exception = e;
125+
LOG.error("Consumer worker failed in ContextCallable wrapper", e);
126+
} else {
127+
LOG.warn("Additional worker failure in ContextCallable wrapper; first exception already recorded", e);
128+
}
129+
this.exceptionHandle(e);
130+
} finally {
131+
this.latch.countDown();
109132
}
133+
return null;
110134
}
111135

112136
private Void runAndDone() {
113137
try {
114138
this.run();
115-
} catch (Throwable e) {
139+
} catch (Exception e) {
116140
if (e instanceof StopExecution) {
117141
this.queue.clear();
118142
putQueueEnd();
119143
} else {
120-
// Only the first exception to one thread can be stored
121-
this.exception = e;
122-
LOG.error("Error when running task", e);
144+
if (this.exception == null) {
145+
this.exception = e;
146+
LOG.error("Unhandled exception in consumer task", e);
147+
} else {
148+
LOG.warn("Additional exception in consumer task; first exception already recorded", e);
149+
}
123150
}
124-
exceptionHandle(e);
151+
this.exceptionHandle(e);
125152
} finally {
126153
this.done();
127-
this.latch.countDown();
128154
}
129155
return null;
130156
}

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/CoreTestSuite.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hugegraph.testutil.Utils;
2727
import org.apache.hugegraph.util.Log;
2828
import org.junit.AfterClass;
29-
import org.junit.Assert;
3029
import org.junit.BeforeClass;
3130
import org.junit.runner.RunWith;
3231
import org.junit.runners.Suite;
@@ -52,11 +51,29 @@
5251
public class CoreTestSuite {
5352

5453
private static boolean registered = false;
55-
private static HugeGraph graph = null;
54+
private static volatile HugeGraph graph = null;
5655

5756
public static HugeGraph graph() {
58-
Assert.assertNotNull(graph);
59-
//Assert.assertFalse(graph.closed());
57+
if (graph == null) {
58+
synchronized (CoreTestSuite.class) {
59+
if (graph == null) {
60+
try {
61+
initEnv();
62+
init();
63+
} catch (Throwable e) {
64+
LOG.error("Failed to initialize HugeGraph instance", e);
65+
graph = null;
66+
throw new RuntimeException("Failed to initialize HugeGraph instance", e);
67+
}
68+
if (graph == null) {
69+
String msg = "HugeGraph instance is null after initialization. " +
70+
"Please check Utils.open() configuration.";
71+
LOG.error(msg);
72+
throw new IllegalStateException(msg);
73+
}
74+
}
75+
}
76+
}
6077
return graph;
6178
}
6279

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.unit.util;
19+
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
import org.apache.hugegraph.testutil.Assert;
25+
import org.apache.hugegraph.util.Consumers;
26+
import org.junit.Test;
27+
28+
public class ConsumersTest {
29+
30+
@Test(timeout = 1000)
31+
public void testStartProvideAwaitNormal() throws Throwable {
32+
ExecutorService executor = Executors.newFixedThreadPool(2);
33+
try {
34+
AtomicInteger processed = new AtomicInteger();
35+
36+
Consumers<Integer> consumers = new Consumers<>(executor, v -> {
37+
processed.incrementAndGet();
38+
});
39+
40+
consumers.start("test");
41+
for (int i = 0; i < 50; i++) {
42+
consumers.provide(i);
43+
}
44+
consumers.await();
45+
46+
Assert.assertEquals("Should process all provided elements",
47+
50, processed.get());
48+
} finally {
49+
executor.shutdownNow();
50+
}
51+
}
52+
53+
/**
54+
* Regression test for deadlock:
55+
*
56+
* ContextCallable fails before entering runAndDone().
57+
* await() must still return because latch is decremented in safeRun().
58+
*/
59+
@Test(timeout = 1000)
60+
public void testAwaitDoesNotHangWhenContextCallableFails() throws Throwable {
61+
ExecutorService executor = Executors.newFixedThreadPool(1);
62+
try {
63+
// Use AssertionError to bypass the inner catch(Exception) loop in runAndDone()
64+
// This simulates a scenario where an exception escapes the task logic
65+
// (similar to how a ContextCallable failure would behave from safeRun's perspective)
66+
Consumers<Integer> consumers = new Consumers<>(executor, v -> {
67+
throw new AssertionError("Simulated fatal error (OOM/StackOverflow/etc)");
68+
});
69+
consumers.start("test-fatal-error");
70+
consumers.provide(1);
71+
// Verification:
72+
// Without the fix, the latch would never be decremented (because runAndDone crashes), causing await() to hang.
73+
// With the fix (safeRun wrapper), the finally block ensures latch.countDown() is called.
74+
consumers.await();
75+
76+
// Note: consumer.exception will be null because safeRun only catches Exception, not Error.
77+
// This is acceptable behavior for fatal errors, as long as it doesn't deadlock.
78+
} finally {
79+
executor.shutdownNow();
80+
}
81+
}
82+
83+
@Test(timeout = 1000)
84+
public void testAwaitThrowsWhenConsumerThrows() throws Throwable {
85+
ExecutorService executor = Executors.newFixedThreadPool(2);
86+
try {
87+
final String msg = "Injected exception for test";
88+
89+
Consumers<Integer> consumers = new Consumers<>(executor, v -> {
90+
throw new RuntimeException(msg);
91+
});
92+
93+
consumers.start("test");
94+
consumers.provide(1);
95+
96+
try {
97+
consumers.await();
98+
Assert.fail("Expected await() to throw when consumer throws");
99+
} catch (Throwable t) {
100+
Throwable root = t.getCause() != null ? t.getCause() : t;
101+
Assert.assertTrue("Expected RuntimeException, but got: " + root,
102+
root instanceof RuntimeException);
103+
Assert.assertTrue("Exception message should contain injected message",
104+
root.getMessage() != null &&
105+
root.getMessage().contains(msg));
106+
}
107+
} finally {
108+
executor.shutdownNow();
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)