Skip to content

Commit a77416e

Browse files
sideshowcodersurybala
authored andcommitted
Support Libmemcached ketama weighted
- Add support for alternative Ketama Node key format Libmemcached uses the format for `[hostname or ip][port unless default]-[repetition]` while spymemcached has been using `[hostname]/[ip]:[port]-[repetition]` the added `KetamaNodeKeyFormat` allows to choose either format while retaining the caching optimization. - Add support for weighted ketama Straight port of the weighting based on Libmemcached, configured via passing a map of node socketaddress to weight to the configuration. This code is only active if the weight is actually configured otherwise the old ketama code is used. - Split testLibKetamaCompatTwo into 2 pieces as the length breaks compiling on Eclipse and Java 8 "In java a methods can't have more than 65535 bytes." http://stackoverflow.com/questions/12257398/how-to-fix-the-code-of-method-is-exceeding-the-65535-bytes-limit Change-Id: I0263b9afc513f9a135d5d17318b3fe6bd4593437 Reviewed-on: http://review.couchbase.org/47624 Reviewed-by: Michael Nitschinger <[email protected]> Tested-by: Michael Nitschinger <[email protected]>
1 parent 27462da commit a77416e

7 files changed

+634
-286
lines changed

src/main/java/net/spy/memcached/KetamaConnectionFactory.java

+48-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333

3434
package net.spy.memcached;
3535

36+
import java.net.InetSocketAddress;
37+
import java.util.HashMap;
3638
import java.util.List;
39+
import java.util.Map;
3740

3841
/**
3942
* ConnectionFactory instance that sets up a ketama compatible connection.
@@ -51,6 +54,9 @@
5154
*/
5255
public class KetamaConnectionFactory extends DefaultConnectionFactory {
5356

57+
private final KetamaNodeKeyFormatter.Format ketamaNodeKeyFormat;
58+
private Map<InetSocketAddress, Integer> weights;
59+
5460
/**
5561
* Create a KetamaConnectionFactory with the given maximum operation
5662
* queue length, and the given read buffer size.
@@ -62,6 +68,32 @@ public KetamaConnectionFactory(ClientMode clientMode, int qLen, int bufSize,
6268
long opQueueMaxBlockTime) {
6369
super(clientMode, qLen, bufSize, DefaultHashAlgorithm.KETAMA_HASH);
6470
}
71+
72+
public KetamaConnectionFactory(int qLen, int bufSize,
73+
long opQueueMaxBlockTime) {
74+
this(qLen, bufSize, opQueueMaxBlockTime,
75+
DefaultHashAlgorithm.KETAMA_HASH,
76+
KetamaNodeKeyFormatter.Format.SPYMEMCACHED,
77+
new HashMap<InetSocketAddress, Integer>());
78+
}
79+
80+
/** Create a KetamaConnectionFactory with the maximum operation queue length,
81+
* the given read buffer size, the maximum time to block waiting operations,
82+
* a specific hash algorithm, a set ring key format, and a given set of
83+
* weights.
84+
*
85+
*/
86+
public KetamaConnectionFactory(
87+
int qLen,
88+
int bufSize,
89+
long opQueueMaxBlockTime,
90+
HashAlgorithm hash,
91+
KetamaNodeKeyFormatter.Format nodeKeyFormat,
92+
Map<InetSocketAddress, Integer> weights) {
93+
super(qLen, bufSize, hash);
94+
this.ketamaNodeKeyFormat = nodeKeyFormat;
95+
this.weights = weights;
96+
}
6597

6698
/**
6799
* Create a KetamaConnectionFactory with the default parameters.
@@ -78,6 +110,21 @@ public KetamaConnectionFactory() {
78110
*/
79111
@Override
80112
public NodeLocator createLocator(List<MemcachedNode> nodes) {
81-
return new KetamaNodeLocator(nodes, getHashAlg());
113+
return new KetamaNodeLocator(nodes, getHashAlg(),
114+
getKetamaNodeKeyFormat(), getWeights());
115+
}
116+
117+
/**
118+
* @return the ketamaNodeKeyFormat
119+
*/
120+
public KetamaNodeKeyFormatter.Format getKetamaNodeKeyFormat() {
121+
return ketamaNodeKeyFormat;
122+
}
123+
124+
/**
125+
* @return the ketama node weights
126+
*/
127+
public Map<InetSocketAddress, Integer> getWeights() {
128+
return weights;
82129
}
83130
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Copyright (C) 2009-2015 Couchbase, Inc.
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
20+
* IN THE SOFTWARE.
21+
*/
22+
23+
package net.spy.memcached;
24+
25+
import java.net.InetSocketAddress;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
/**
30+
* Known key formats used in Ketama for assigning nodes around the ring
31+
*/
32+
33+
public class KetamaNodeKeyFormatter {
34+
35+
public enum Format {
36+
/**
37+
* SPYMEMCACHED uses the format traditionally used by spymemcached to map
38+
* nodes to names. The format is HOSTNAME/IP:PORT-ITERATION
39+
*
40+
* <p>
41+
* This default implementation uses the socket-address of the MemcachedNode
42+
* and concatenates it with a hyphen directly against the repetition number
43+
* for example a key for a particular server's first repetition may look like:
44+
* <p>
45+
*
46+
* <p>
47+
* <code>myhost/10.0.2.1-0</code>
48+
* </p>
49+
*
50+
* <p>
51+
* for the second repetition
52+
* </p>
53+
*
54+
* <p>
55+
* <code>myhost/10.0.2.1-1</code>
56+
* </p>
57+
*
58+
* <p>
59+
* for a server where reverse lookups are failing the returned keys may look
60+
* like
61+
* </p>
62+
*
63+
* <p>
64+
* <code>/10.0.2.1-0</code> and <code>/10.0.2.1-1</code>
65+
* </p>
66+
*/
67+
SPYMEMCACHED,
68+
69+
/**
70+
* LIBMEMCACHED uses the format traditionally used by libmemcached to map
71+
* nodes to names. The format is HOSTNAME:[PORT]-ITERATION the PORT is not
72+
* part of the node identifier if it is the default memecached port (11211)
73+
*/
74+
LIBMEMCACHED
75+
}
76+
77+
private final Format format;
78+
79+
public Format getFormat() {
80+
return format;
81+
}
82+
83+
// Carrried over from the DefaultKetamaNodeLocatorConfiguration:
84+
// Internal lookup map to try to carry forward the optimisation that was
85+
// previously in KetamaNodeLocator
86+
private Map<MemcachedNode, String> nodeKeys = new HashMap<MemcachedNode, String>();
87+
88+
public KetamaNodeKeyFormatter() {
89+
this(Format.SPYMEMCACHED);
90+
}
91+
92+
public KetamaNodeKeyFormatter(Format format) {
93+
this.format = format;
94+
}
95+
96+
/**
97+
* Returns a uniquely identifying key, suitable for hashing by the
98+
* KetamaNodeLocator algorithm.
99+
*
100+
* @param node The MemcachedNode to use to form the unique identifier
101+
* @param repetition The repetition number for the particular node in question
102+
* (0 is the first repetition)
103+
* @return The key that represents the specific repetition of the node
104+
*/
105+
public String getKeyForNode(MemcachedNode node, int repetition) {
106+
// Carrried over from the DefaultKetamaNodeLocatorConfiguration:
107+
// Internal Using the internal map retrieve the socket addresses
108+
// for given nodes.
109+
// I'm aware that this code is inherently thread-unsafe as
110+
// I'm using a HashMap implementation of the map, but the worst
111+
// case ( I believe) is we're slightly in-efficient when
112+
// a node has never been seen before concurrently on two different
113+
// threads, so it the socketaddress will be requested multiple times!
114+
// all other cases should be as fast as possible.
115+
String nodeKey = nodeKeys.get(node);
116+
if (nodeKey == null) {
117+
switch(this.format) {
118+
case LIBMEMCACHED:
119+
InetSocketAddress address = (InetSocketAddress)node.getSocketAddress();
120+
nodeKey = address.getHostName();
121+
if (address.getPort() != 11211) {
122+
nodeKey += ":" + address.getPort();
123+
}
124+
break;
125+
case SPYMEMCACHED:
126+
nodeKey = String.valueOf(node.getSocketAddress());
127+
if (nodeKey.startsWith("/")) {
128+
nodeKey = nodeKey.substring(1);
129+
}
130+
break;
131+
default:
132+
assert false;
133+
}
134+
nodeKeys.put(node, nodeKey);
135+
}
136+
return nodeKey + "-" + repetition;
137+
}
138+
}

src/main/java/net/spy/memcached/KetamaNodeLocator.java

+98-20
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
2828
import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
2929

30+
import java.net.InetSocketAddress;
3031
import java.util.ArrayList;
3132
import java.util.Collection;
33+
import java.util.HashMap;
3234
import java.util.Iterator;
3335
import java.util.List;
3436
import java.util.Map;
@@ -51,6 +53,8 @@ public final class KetamaNodeLocator extends SpyObject implements NodeLocator {
5153
private volatile Collection<MemcachedNode> allNodes;
5254

5355
private final HashAlgorithm hashAlg;
56+
private final Map<InetSocketAddress, Integer> weights;
57+
private final boolean isWeightedKetama;
5458
private final KetamaNodeLocatorConfiguration config;
5559

5660
/**
@@ -63,7 +67,26 @@ public final class KetamaNodeLocator extends SpyObject implements NodeLocator {
6367
* consistent hash continuum
6468
*/
6569
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
66-
this(nodes, alg, new DefaultKetamaNodeLocatorConfiguration());
70+
this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<InetSocketAddress, Integer>());
71+
}
72+
73+
/**
74+
* Create a new KetamaNodeLocator with specific nodes, hash, node key format,
75+
* and weight
76+
*
77+
* @param nodes The List of nodes to use in the Ketama consistent hash
78+
* continuum
79+
* @param alg The hash algorithm to use when choosing a node in the Ketama
80+
* consistent hash continuum
81+
* @param nodeKeyFormat the format used to name the nodes in Ketama, either
82+
* SPYMEMCACHED or LIBMEMCACHED
83+
* @param weights node weights for ketama, a map from InetSocketAddress to
84+
* weight as Integer
85+
*/
86+
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
87+
KetamaNodeKeyFormatter.Format nodeKeyFormat,
88+
Map<InetSocketAddress, Integer> weights) {
89+
this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat)));
6790
}
6891

6992
/**
@@ -78,21 +101,44 @@ public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
78101
*/
79102
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
80103
KetamaNodeLocatorConfiguration conf) {
104+
this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
105+
}
106+
107+
/**
108+
* Create a new KetamaNodeLocator with specific nodes, hash, node key format,
109+
* and weight
110+
*
111+
* @param nodes The List of nodes to use in the Ketama consistent hash
112+
* continuum
113+
* @param alg The hash algorithm to use when choosing a node in the Ketama
114+
* consistent hash continuum
115+
* @param weights node weights for ketama, a map from InetSocketAddress to
116+
* weight as Integer
117+
* @param configuration node locator configuration
118+
*/
119+
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
120+
Map<InetSocketAddress, Integer> nodeWeights,
121+
KetamaNodeLocatorConfiguration configuration) {
81122
super();
82123
allNodes = nodes;
83124
hashAlg = alg;
84-
config = conf;
125+
config = configuration;
126+
weights = nodeWeights;
127+
isWeightedKetama = !weights.isEmpty();
85128
setKetamaNodes(nodes);
86129
}
87130

88131
private KetamaNodeLocator(TreeMap<Long, MemcachedNode> smn,
89132
Collection<MemcachedNode> an, HashAlgorithm alg,
133+
Map<InetSocketAddress, Integer> nodeWeights,
90134
KetamaNodeLocatorConfiguration conf) {
91135
super();
92136
ketamaNodes = smn;
93137
allNodes = an;
94138
hashAlg = alg;
95139
config = conf;
140+
weights = nodeWeights;
141+
isWeightedKetama = !weights.isEmpty();
96142
}
97143

98144
public Collection<MemcachedNode> getAll() {
@@ -147,7 +193,7 @@ public NodeLocator getReadonlyCopy() {
147193
an.add(new MemcachedNodeROImpl(n));
148194
}
149195

150-
return new KetamaNodeLocator(smn, an, hashAlg, config);
196+
return new KetamaNodeLocator(smn, an, hashAlg, weights, config);
151197
}
152198

153199
@Override
@@ -172,30 +218,62 @@ protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
172218
*/
173219
protected void setKetamaNodes(List<MemcachedNode> nodes) {
174220
TreeMap<Long, MemcachedNode> newNodeMap =
175-
new TreeMap<Long, MemcachedNode>();
221+
new TreeMap<Long, MemcachedNode>();
176222
int numReps = config.getNodeRepetitions();
223+
int nodeCount = nodes.size();
224+
int totalWeight = 0;
225+
226+
if (isWeightedKetama) {
227+
for (MemcachedNode node : nodes) {
228+
totalWeight += weights.get(node.getSocketAddress());
229+
}
230+
}
231+
177232
for (MemcachedNode node : nodes) {
178-
// Ketama does some special work with md5 where it reuses chunks.
179-
if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
180-
for (int i = 0; i < numReps / 4; i++) {
181-
byte[] digest =
182-
DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, i));
183-
for (int h = 0; h < 4; h++) {
184-
Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24)
185-
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
186-
| ((long) (digest[1 + h * 4] & 0xFF) << 8)
187-
| (digest[h * 4] & 0xFF);
188-
newNodeMap.put(k, node);
189-
getLogger().debug("Adding node %s in position %d", node, k);
233+
if (isWeightedKetama) {
234+
235+
int thisWeight = weights.get(node.getSocketAddress());
236+
float percent = (float)thisWeight / (float)totalWeight;
237+
int pointerPerServer = (int)((Math.floor((float)(percent * (float)config.getNodeRepetitions() / 4 * (float)nodeCount + 0.0000000001))) * 4);
238+
for (int i = 0; i < pointerPerServer / 4; i++) {
239+
for(long position : ketamaNodePositionsAtIteration(node, i)) {
240+
newNodeMap.put(position, node);
241+
getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
242+
}
190243
}
191-
}
192244
} else {
193-
for (int i = 0; i < numReps; i++) {
194-
newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
195-
}
245+
// Ketama does some special work with md5 where it reuses chunks.
246+
// Check to be backwards compatible, the hash algorithm does not
247+
// matter for Ketama, just the placement should always be done using
248+
// MD5
249+
if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
250+
for (int i = 0; i < numReps / 4; i++) {
251+
for(long position : ketamaNodePositionsAtIteration(node, i)) {
252+
newNodeMap.put(position, node);
253+
getLogger().debug("Adding node %s in position %d", node, position);
254+
}
255+
}
256+
} else {
257+
for (int i = 0; i < numReps; i++) {
258+
newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
259+
}
260+
}
196261
}
197262
}
198263
assert newNodeMap.size() == numReps * nodes.size();
199264
ketamaNodes = newNodeMap;
200265
}
266+
267+
private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) {
268+
List<Long> positions = new ArrayList<Long>();
269+
byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
270+
for (int h = 0; h < 4; h++) {
271+
Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24)
272+
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
273+
| ((long) (digest[1 + h * 4] & 0xFF) << 8)
274+
| (digest[h * 4] & 0xFF);
275+
positions.add(k);
276+
}
277+
return positions;
278+
}
201279
}

0 commit comments

Comments
 (0)