Skip to content

Commit c99cfcd

Browse files
haohao0103imbajin
andauthored
fix(server):fix graph server cache notifier mechanism (#2729)
* #2728 * fix some typo & tiny improve --------- Co-authored-by: imbajin <jin@apache.org>
1 parent a4cb44e commit c99cfcd

File tree

3 files changed

+97
-80
lines changed

3 files changed

+97
-80
lines changed

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hugegraph;
1919

20+
import java.util.Arrays;
2021
import java.util.Collection;
2122
import java.util.Iterator;
2223
import java.util.List;
@@ -577,11 +578,7 @@ private BackendStoreProvider loadStoreProvider() {
577578
private AbstractSerializer serializer() {
578579
String name = this.configuration.get(CoreOptions.SERIALIZER);
579580
LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
580-
AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name);
581-
if (serializer == null) {
582-
throw new HugeException("Can't load serializer with name " + name);
583-
}
584-
return serializer;
581+
return SerializerFactory.serializer(this.configuration, name);
585582
}
586583

587584
private Analyzer analyzer() {
@@ -597,7 +594,7 @@ protected void reloadRamtable() {
597594
}
598595

599596
protected void reloadRamtable(boolean loadFromFile) {
600-
// Expect triggered manually, like gremlin job
597+
// Expect triggered manually, like a gremlin job
601598
if (this.ramtable != null) {
602599
this.ramtable.reload(loadFromFile, this.name);
603600
} else {
@@ -1615,37 +1612,51 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) {
16151612

16161613
private static class AbstractCacheNotifier implements CacheNotifier {
16171614

1615+
public static final Logger LOG = Log.logger(AbstractCacheNotifier.class);
1616+
16181617
private final EventHub hub;
16191618
private final EventListener cacheEventListener;
16201619

16211620
public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
16221621
this.hub = hub;
16231622
this.cacheEventListener = event -> {
1624-
Object[] args = event.args();
1625-
E.checkArgument(args.length > 0 && args[0] instanceof String,
1626-
"Expect event action argument");
1627-
if (Cache.ACTION_INVALIDED.equals(args[0])) {
1628-
event.checkArgs(String.class, HugeType.class, Object.class);
1629-
HugeType type = (HugeType) args[1];
1630-
Object ids = args[2];
1631-
if (ids instanceof Id[]) {
1632-
// argument type mismatch: proxy.invalid2(type,Id[]ids)
1633-
proxy.invalid2(type, (Id[]) ids);
1634-
} else if (ids instanceof Id) {
1635-
proxy.invalid(type, (Id) ids);
1636-
} else {
1637-
E.checkArgument(false, "Unexpected argument: %s", ids);
1623+
try {
1624+
LOG.info("Received event: {}", event);
1625+
Object[] args = event.args();
1626+
E.checkArgument(args.length > 0 && args[0] instanceof String,
1627+
"Expect event action argument");
1628+
String action = (String) args[0];
1629+
LOG.debug("Event action: {}", action);
1630+
if (Cache.ACTION_INVALIDED.equals(action)) {
1631+
event.checkArgs(String.class, HugeType.class, Object.class);
1632+
HugeType type = (HugeType) args[1];
1633+
Object ids = args[2];
1634+
if (ids instanceof Id[]) {
1635+
LOG.debug("Calling proxy.invalid2 with type: {}, IDs: {}", type, Arrays.toString((Id[]) ids));
1636+
proxy.invalid2(type, (Id[]) ids);
1637+
} else if (ids instanceof Id) {
1638+
LOG.debug("Calling proxy.invalid with type: {}, ID: {}", type, ids);
1639+
proxy.invalid(type, (Id) ids);
1640+
} else {
1641+
LOG.error("Unexpected argument: {}", ids);
1642+
E.checkArgument(false, "Unexpected argument: %s", ids);
1643+
}
1644+
return true;
1645+
} else if (Cache.ACTION_CLEARED.equals(action)) {
1646+
event.checkArgs(String.class, HugeType.class);
1647+
HugeType type = (HugeType) args[1];
1648+
LOG.debug("Calling proxy.clear with type: {}", type);
1649+
proxy.clear(type);
1650+
return true;
16381651
}
1639-
return true;
1640-
} else if (Cache.ACTION_CLEARED.equals(args[0])) {
1641-
event.checkArgs(String.class, HugeType.class);
1642-
HugeType type = (HugeType) args[1];
1643-
proxy.clear(type);
1644-
return true;
1652+
} catch (Exception e) {
1653+
LOG.error("Error processing cache event: {}", e.getMessage(), e);
16451654
}
1655+
LOG.warn("Event {} not handled",event);
16461656
return false;
16471657
};
16481658
this.hub.listen(Events.CACHE, this.cacheEventListener);
1659+
LOG.info("Cache event listener registered successfully. cacheEventListener {}",this.cacheEventListener);
16491660
}
16501661

16511662
@Override

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ private void listenChanges() {
133133
}
134134
return false;
135135
};
136-
this.store().provider().listen(this.storeEventListener);
136+
if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){
137+
this.store().provider().listen(this.storeEventListener);
138+
}
137139

138140
// Listen cache event: "cache"(invalid cache item)
139141
this.cacheEventListener = event -> {
@@ -182,19 +184,21 @@ private void listenChanges() {
182184
}
183185
return false;
184186
};
185-
EventHub graphEventHub = this.params().graphEventHub();
186-
if (!graphEventHub.containsListener(Events.CACHE)) {
187+
if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){
188+
EventHub graphEventHub = this.params().graphEventHub();
187189
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
188190
}
189191
}
190192

191193
private void unlistenChanges() {
192-
// Unlisten store event
193-
this.store().provider().unlisten(this.storeEventListener);
194-
195-
// Unlisten cache event
196-
EventHub graphEventHub = this.params().graphEventHub();
197-
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
194+
String graphName = this.params().name();
195+
if (graphCacheListenStatus.remove(graphName) != null) {
196+
EventHub graphEventHub = this.params().graphEventHub();
197+
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
198+
}
199+
if (storeEventListenStatus.remove(graphName) != null) {
200+
this.store().provider().unlisten(this.storeEventListener);
201+
}
198202
}
199203

200204
private void notifyChanges(String action, HugeType type, Id[] ids) {

0 commit comments

Comments
 (0)