Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hugegraph;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -577,11 +578,7 @@ private BackendStoreProvider loadStoreProvider() {
private AbstractSerializer serializer() {
String name = this.configuration.get(CoreOptions.SERIALIZER);
LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name);
if (serializer == null) {
throw new HugeException("Can't load serializer with name " + name);
}
return serializer;
return SerializerFactory.serializer(this.configuration, name);
}

private Analyzer analyzer() {
Expand All @@ -597,7 +594,7 @@ protected void reloadRamtable() {
}

protected void reloadRamtable(boolean loadFromFile) {
// Expect triggered manually, like gremlin job
// Expect triggered manually, like a gremlin job
if (this.ramtable != null) {
this.ramtable.reload(loadFromFile, this.name);
} else {
Expand Down Expand Up @@ -1615,37 +1612,51 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) {

private static class AbstractCacheNotifier implements CacheNotifier {

public static final Logger LOG = Log.logger(AbstractCacheNotifier.class);

private final EventHub hub;
private final EventListener cacheEventListener;

public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
this.hub = hub;
this.cacheEventListener = event -> {
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALIDED.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
Object ids = args[2];
if (ids instanceof Id[]) {
// argument type mismatch: proxy.invalid2(type,Id[]ids)
proxy.invalid2(type, (Id[]) ids);
} else if (ids instanceof Id) {
proxy.invalid(type, (Id) ids);
} else {
E.checkArgument(false, "Unexpected argument: %s", ids);
try {
LOG.info("Received event: {}", event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
String action = (String) args[0];
LOG.debug("Event action: {}", action);
if (Cache.ACTION_INVALIDED.equals(action)) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
Object ids = args[2];
if (ids instanceof Id[]) {
LOG.debug("Calling proxy.invalid2 with type: {}, IDs: {}", type, Arrays.toString((Id[]) ids));
proxy.invalid2(type, (Id[]) ids);
} else if (ids instanceof Id) {
LOG.debug("Calling proxy.invalid with type: {}, ID: {}", type, ids);
proxy.invalid(type, (Id) ids);
} else {
LOG.error("Unexpected argument: {}", ids);
E.checkArgument(false, "Unexpected argument: %s", ids);
}
return true;
} else if (Cache.ACTION_CLEARED.equals(action)) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
LOG.debug("Calling proxy.clear with type: {}", type);
proxy.clear(type);
return true;
}
return true;
} else if (Cache.ACTION_CLEARED.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
proxy.clear(type);
return true;
} catch (Exception e) {
LOG.error("Error processing cache event: {}", e.getMessage(), e);
}
LOG.warn("Event {} not handled",event);
return false;
};
this.hub.listen(Events.CACHE, this.cacheEventListener);
LOG.info("Cache event listener registered successfully. cacheEventListener {}",this.cacheEventListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ private void listenChanges() {
}
return false;
};
this.store().provider().listen(this.storeEventListener);
if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){
this.store().provider().listen(this.storeEventListener);
}

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
Expand Down Expand Up @@ -182,19 +184,21 @@ private void listenChanges() {
}
return false;
};
EventHub graphEventHub = this.params().graphEventHub();
if (!graphEventHub.containsListener(Events.CACHE)) {
if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}

private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);

// Unlisten cache event
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
String graphName = this.params().name();
if (graphCacheListenStatus.remove(graphName) != null) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}
if (storeEventListenStatus.remove(graphName) != null) {
this.store().provider().unlisten(this.storeEventListener);
}
}

private void notifyChanges(String action, HugeType type, Id[] ids) {
Expand Down
Loading
Loading