Skip to content

Commit 3265dde

Browse files
author
Michael Rappazzo
committed
properties: upgrade HBaseVertex to support multi-properties
The HBase graph tinkerpop implementation specifies Cardinality.single as the default. Since HBase does not (at the time of this writing) support multiple values, this feature wraps the multi-values in a list and writes the list to HBase. As a consequence of this list wrapper, indexed properties can still only support-single value properties. When reading/writing from HBase, most properties will be written as single objects. Only when a property is specifically added with non-single cardinality will it be written to HBase as a list. In order to avoid java native serialization, the serialization of non-single property values uses Kryo (with a small lightweight wrapper class to accomplish this).
1 parent 29b0ea4 commit 3265dde

15 files changed

+564
-172
lines changed

src/main/java/io/hgraphdb/HBaseBulkLoader.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
package io.hgraphdb;
22

3-
import io.hgraphdb.mutators.*;
3+
import io.hgraphdb.mutators.Creator;
4+
import io.hgraphdb.mutators.EdgeIndexRemover;
5+
import io.hgraphdb.mutators.EdgeIndexWriter;
6+
import io.hgraphdb.mutators.EdgeWriter;
7+
import io.hgraphdb.mutators.PropertyWriter;
8+
import io.hgraphdb.mutators.VertexIndexRemover;
9+
import io.hgraphdb.mutators.VertexIndexWriter;
10+
import io.hgraphdb.mutators.VertexWriter;
11+
412
import org.apache.hadoop.hbase.TableName;
513
import org.apache.hadoop.hbase.client.BufferedMutator;
614
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
@@ -84,7 +92,7 @@ public Vertex addVertex(final Object... keyValues) {
8492
idValue = HBaseGraphUtils.generateIdIfNeeded(idValue);
8593
long now = System.currentTimeMillis();
8694
HBaseVertex vertex = new HBaseVertex(graph, idValue, label, now, now,
87-
HBaseGraphUtils.propertiesToMap(keyValues));
95+
HBaseGraphUtils.propertiesToMultimap(keyValues));
8896
vertex.validate();
8997

9098
Iterator<IndexMetadata> indices = vertex.getIndices(OperationType.WRITE);
@@ -192,14 +200,14 @@ public void setProperty(Vertex vertex, String key, Object value) {
192200
boolean hasIndex = v.hasIndex(OperationType.WRITE, key);
193201
if (hasIndex) {
194202
// only load old value if using index
195-
oldValue = v.getProperty(key);
203+
oldValue = v.getSingleProperty(key).orElse(null);
196204
if (oldValue != null && !oldValue.equals(value)) {
197205
VertexIndexRemover indexRemover = new VertexIndexRemover(graph, v, key, null);
198206
if (vertexIndicesMutator != null) vertexIndicesMutator.mutate(getMutationList(indexRemover.constructMutations()));
199207
}
200208
}
201209

202-
v.getProperties().put(key, value);
210+
v.cacheProperty(key, value);
203211
v.updatedAt(System.currentTimeMillis());
204212

205213
if (hasIndex) {
@@ -220,7 +228,8 @@ private List<? extends Mutation> getMutationList(Iterator<? extends Mutation> mu
220228
m -> m.setDurability(skipWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT)));
221229
}
222230

223-
public void close() {
231+
@Override
232+
public void close() {
224233
try {
225234
if (edgesMutator != null) edgesMutator.close();
226235
if (edgeIndicesMutator != null) edgeIndicesMutator.close();

src/main/java/io/hgraphdb/HBaseEdge.java

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import io.hgraphdb.models.EdgeIndexModel;
44
import io.hgraphdb.models.EdgeModel;
5+
import io.hgraphdb.mutators.Mutator;
6+
import io.hgraphdb.mutators.Mutators;
7+
58
import org.apache.tinkerpop.gremlin.structure.Direction;
69
import org.apache.tinkerpop.gremlin.structure.Edge;
710
import org.apache.tinkerpop.gremlin.structure.Property;
@@ -12,15 +15,22 @@
1215
import org.slf4j.Logger;
1316
import org.slf4j.LoggerFactory;
1417

18+
import java.util.Collections;
1519
import java.util.Iterator;
1620
import java.util.Map;
21+
import java.util.Set;
22+
import java.util.Map.Entry;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.stream.Stream;
1725

1826
public class HBaseEdge extends HBaseElement implements Edge {
1927

2028
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseEdge.class);
2129

2230
private Vertex inVertex;
2331
private Vertex outVertex;
32+
protected Map<String, Object> properties;
33+
protected transient boolean propertiesFullyLoaded;
2434

2535
public HBaseEdge(HBaseGraph graph, Object id) {
2636
this(graph, id, null, null, null, null, null, null);
@@ -37,10 +47,12 @@ public HBaseEdge(HBaseGraph graph, Object id, String label, Long createdAt, Long
3747

3848
public HBaseEdge(HBaseGraph graph, Object id, String label, Long createdAt, Long updatedAt, Map<String, Object> properties,
3949
boolean propertiesFullyLoaded, Vertex inVertex, Vertex outVertex) {
40-
super(graph, id, label, createdAt, updatedAt, properties, propertiesFullyLoaded);
50+
super(graph, id, label, createdAt, updatedAt);
4151

4252
this.inVertex = inVertex;
4353
this.outVertex = outVertex;
54+
this.properties = properties;
55+
this.propertiesFullyLoaded = propertiesFullyLoaded;
4456
}
4557

4658
@Override
@@ -55,13 +67,26 @@ public ElementType getElementType() {
5567
return ElementType.EDGE;
5668
}
5769

70+
public Map<String, Object> getProperties() {
71+
if (properties == null || !propertiesFullyLoaded) {
72+
load();
73+
propertiesFullyLoaded = true;
74+
}
75+
return properties;
76+
}
77+
5878
@Override
5979
public void copyFrom(HBaseElement element) {
6080
super.copyFrom(element);
6181
if (element instanceof HBaseEdge) {
6282
HBaseEdge copy = (HBaseEdge) element;
6383
if (copy.inVertex != null) this.inVertex = copy.inVertex;
6484
if (copy.outVertex != null) this.outVertex = copy.outVertex;
85+
if (copy.properties != null
86+
&& (copy.propertiesFullyLoaded || this.properties == null)) {
87+
this.properties = new ConcurrentHashMap<>(copy.properties);
88+
this.propertiesFullyLoaded = copy.propertiesFullyLoaded;
89+
}
6590
}
6691
}
6792

@@ -100,6 +125,101 @@ public Vertex getVertex(Direction direction) throws IllegalArgumentException {
100125
return Direction.IN.equals(direction) ? inVertex : outVertex;
101126
}
102127

128+
public void setProperty(String key, Object value) {
129+
ElementHelper.validateProperty(key, value);
130+
131+
graph.validateProperty(getElementType(), label, key, value);
132+
133+
// delete from index model before setting property
134+
Object oldValue = null;
135+
boolean hasIndex = hasIndex(OperationType.WRITE, key);
136+
if (hasIndex) {
137+
// only load old value if using index
138+
oldValue = getProperty(key);
139+
if (oldValue != null && !oldValue.equals(value)) {
140+
deleteFromIndexModel(key, null);
141+
}
142+
}
143+
144+
getProperties().put(key, value);
145+
updatedAt(System.currentTimeMillis());
146+
147+
if (hasIndex) {
148+
if (oldValue == null || !oldValue.equals(value)) {
149+
writeToIndexModel(key);
150+
}
151+
}
152+
Mutator writer = getModel().writeProperty(this, key, value);
153+
Mutators.write(getTable(), writer);
154+
}
155+
156+
public void incrementProperty(String key, long value) {
157+
if (!graph.configuration().getUseSchema()) {
158+
throw new HBaseGraphNoSchemaException("Schema not enabled");
159+
}
160+
ElementHelper.validateProperty(key, value);
161+
162+
graph.validateProperty(getElementType(), label, key, value);
163+
164+
updatedAt(System.currentTimeMillis());
165+
166+
Mutator writer = getModel().incrementProperty(this, key, value);
167+
long newValue = Mutators.increment(getTable(), writer, key);
168+
getProperties().put(key, newValue);
169+
}
170+
171+
@Override
172+
public Set<String> keys() {
173+
return Collections.unmodifiableSet(properties.keySet());
174+
}
175+
176+
public void removeProperty(String key) {
177+
Object value = getProperty(key);
178+
if (value != null) {
179+
// delete from index model before removing property
180+
boolean hasIndex = hasIndex(OperationType.WRITE, key);
181+
if (hasIndex) {
182+
deleteFromIndexModel(key, null);
183+
}
184+
185+
getProperties().remove(key);
186+
updatedAt(System.currentTimeMillis());
187+
188+
Mutator writer = getModel().clearProperty(this, key);
189+
Mutators.write(getTable(), writer);
190+
}
191+
}
192+
193+
@Override
194+
public Stream<Entry<String, Object>> propertyEntriesStream() {
195+
return properties.entrySet().stream();
196+
}
197+
198+
@Override
199+
public int propertySize() {
200+
return this.properties.size();
201+
}
202+
203+
@SuppressWarnings("unchecked")
204+
public <V> V getProperty(String key) {
205+
if (properties != null) {
206+
// optimization for partially loaded properties
207+
V val = (V) properties.get(key);
208+
if (val != null) return val;
209+
}
210+
return (V) getProperties().get(key);
211+
}
212+
213+
@Override
214+
public boolean hasProperty(String key) {
215+
if (properties != null) {
216+
// optimization for partially loaded properties
217+
Object val = properties.get(key);
218+
if (val != null) return true;
219+
}
220+
return keys().contains(key);
221+
}
222+
103223
@Override
104224
public void remove() {
105225
// Get rid of the endpoints and edge themselves.
@@ -115,7 +235,7 @@ public void remove() {
115235

116236
@Override
117237
public <V> Iterator<Property<V>> properties(final String... propertyKeys) {
118-
Iterable<String> keys = getPropertyKeys();
238+
Iterable<String> keys = keys();
119239
Iterator<String> filter = IteratorUtils.filter(keys.iterator(),
120240
key -> ElementHelper.keyExists(key, propertyKeys));
121241
return IteratorUtils.map(filter,

0 commit comments

Comments
 (0)