Skip to content

Commit 8032480

Browse files
author
Miriam Pena
authored
Merge pull request #14 from AdRoll/cas_ops
Support CAS
2 parents 59ebec4 + ad87cb3 commit 8032480

File tree

8 files changed

+408
-116
lines changed

8 files changed

+408
-116
lines changed

include/mero.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@
5858
{host, Host},
5959
{port, Port}]}).
6060

61+
-record(mero_item, {key,
62+
value,
63+
cas}).
64+
6165
-endif.

src/mero.erl

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@
4242
mincrement_counter/7,
4343
get/2,
4444
get/3,
45+
gets/2,
46+
gets/3,
4547
delete/3,
4648
mdelete/3,
4749
mget/2,
4850
mget/3,
51+
mgets/2,
52+
mgets/3,
4953
set/5,
54+
cas/6,
5055
add/5,
5156
flush_all/1,
5257
shard_phash2/2,
@@ -60,6 +65,7 @@
6065

6166
-include_lib("mero/include/mero.hrl").
6267

68+
-type cas_token() :: undefined | integer().
6369

6470
%%%=============================================================================
6571
%%% Application behaviour
@@ -80,38 +86,73 @@ stop(_State) ->
8086
%%% External functions
8187
%%%=============================================================================
8288

83-
%% @doc: Gets the value of a key in a specific cluster
84-
-spec get(ClusterName :: atom(), Key :: binary()) ->
85-
{Key :: binary(), Value :: undefined | integer()}
86-
| {error, Reason :: term()}.
87-
get(ClusterName, Key) when is_binary(Key), is_atom(ClusterName) ->
88-
get(ClusterName, Key, mero_conf:timeout_read()).
89-
9089
-spec get(ClusterName :: atom(), Key :: binary(), Timeout :: integer()) ->
91-
{Key :: binary(), Value :: undefined | binary()}
92-
| {error, Reason :: term()}.
93-
get(ClusterName, Key, Timeout) when is_binary(Key), is_atom(ClusterName) ->
94-
case mero_conn:get(ClusterName, [Key], Timeout) of
95-
[{Key, Value}] ->
96-
{Key, Value};
97-
{error, [Reason], []} ->
90+
{Key :: binary(), Value :: undefined | binary()}
91+
| {error, Reason :: term()}.
92+
get(ClusterName, Key, Timeout) ->
93+
case gets(ClusterName, Key, Timeout) of
94+
{error, Reason} ->
9895
{error, Reason};
99-
{error, Reason, []} ->
100-
{error, Reason};
101-
{error, _Reason, [{Key, Value}]} ->
96+
{Key, Value, _CAS} ->
10297
{Key, Value}
10398
end.
99+
get(ClusterName, Key) ->
100+
get(ClusterName, Key, mero_conf:timeout_read()).
104101

105102

106-
%% NOTE: On error we still could have processed part of the request, so we still
107-
%% return all the completed responses.
108103
-spec mget(ClusterName :: atom(), Keys :: [binary()], Timeout :: integer()) ->
109104
[{Key :: binary(), Value :: undefined | binary()}]
110105
| {error, Reason :: term(), ProcessedKeyValues :: [{Key :: binary(), Value :: binary()}]}.
111-
mget(ClusterName, Keys) when is_list(Keys), is_atom(ClusterName) ->
112-
mero_conn:get(ClusterName, Keys, mero_conf:timeout_read()).
113106
mget(ClusterName, Keys, Timeout) when is_list(Keys), is_atom(ClusterName) ->
114-
mero_conn:get(ClusterName, Keys, Timeout).
107+
Extract = fun (Items) ->
108+
[{Key, Value}
109+
|| {Key, Value, _} <- Items]
110+
end,
111+
case mgets(ClusterName, Keys, Timeout) of
112+
{error, Reason, ProcessedKeyValues} ->
113+
{error, Reason, Extract(ProcessedKeyValues)};
114+
KeyValues ->
115+
Extract(KeyValues)
116+
end.
117+
mget(ClusterName, Keys) ->
118+
mget(ClusterName, Keys, mero_conf:timeout_read()).
119+
120+
121+
-spec gets(ClusterName :: atom(), Key :: binary(), Timeout :: integer()) ->
122+
{Key :: binary(), Value :: undefined | binary(), CAS :: cas_token()}
123+
| {error, Reason :: term()}.
124+
gets(ClusterName, Key, Timeout) ->
125+
case mgets(ClusterName, [Key], Timeout) of
126+
{error, [Reason], []} ->
127+
{error, Reason};
128+
{error, _Reason, [Processed]} ->
129+
Processed;
130+
[Result] ->
131+
Result;
132+
[] ->
133+
{Key, undefined, undefined}
134+
end.
135+
gets(ClusterName, Key) ->
136+
gets(ClusterName, Key, mero_conf:timeout_read()).
137+
138+
139+
-spec mgets(ClusterName :: atom(), Keys :: [binary()], Timeout :: integer()) ->
140+
[{Key :: binary(), Value :: undefined | binary(), CAS :: cas_token()}]
141+
| {error, Reason :: term(),
142+
ProcessedKeyValues :: [{Key :: binary(), Value :: binary(), CAS :: cas_token()}]}.
143+
mgets(ClusterName, Keys, Timeout) when is_list(Keys), is_atom(ClusterName) ->
144+
Extract = fun (Items) ->
145+
[{Key, Value, CAS}
146+
|| #mero_item{key = Key, value = Value, cas = CAS} <- Items]
147+
end,
148+
case mero_conn:get(ClusterName, Keys, Timeout) of
149+
{error, Reason, ProcessedKeyValues} ->
150+
{error, Reason, Extract(ProcessedKeyValues)};
151+
KeyValues ->
152+
Extract(KeyValues)
153+
end.
154+
mgets(ClusterName, Keys) ->
155+
mgets(ClusterName, Keys, mero_conf:timeout_read()).
115156

116157

117158
-spec add(ClusterName :: atom(), Key :: binary(), Value :: binary(), ExpTime :: integer(),
@@ -122,14 +163,29 @@ add(ClusterName, Key, Value, ExpTime, Timeout)
122163
BExpTime = list_to_binary(integer_to_list(ExpTime)),
123164
mero_conn:add(ClusterName, Key, Value, BExpTime, Timeout).
124165

125-
%% ExpTime is in seconds.
126-
-spec set(ClusterName :: atom(), Key :: binary(), Value :: binary(), ExpTime :: integer(),
127-
Timeout :: integer()) ->
166+
167+
-spec set(ClusterName :: atom(),
168+
Key :: binary(),
169+
Value :: binary(),
170+
ExpTime :: integer(), % value is in seconds
171+
Timeout :: integer()) ->
172+
ok | {error, Reason :: term()}.
173+
set(ClusterName, Key, Value, ExpTime, Timeout) ->
174+
cas(ClusterName, Key, Value, ExpTime, Timeout, undefined).
175+
176+
177+
-spec cas(ClusterName :: atom(),
178+
Key :: binary(),
179+
Value :: binary(),
180+
ExpTime :: integer(), % value is in seconds
181+
Timeout :: integer(),
182+
CAS :: cas_token()) ->
128183
ok | {error, Reason :: term()}.
129-
set(ClusterName, Key, Value, ExpTime, Timeout)
184+
cas(ClusterName, Key, Value, ExpTime, Timeout, CAS)
130185
when is_binary(Key), is_atom(ClusterName), is_binary(Value), is_integer(ExpTime) ->
131186
BExpTime = list_to_binary(integer_to_list(ExpTime)),
132-
mero_conn:set(ClusterName, Key, Value, BExpTime, Timeout).
187+
%% note: if CAS is undefined, this will be an unconditional set:
188+
mero_conn:set(ClusterName, Key, Value, BExpTime, Timeout, CAS).
133189

134190

135191
%% @doc: Increments a counter: initial value is 1, steps of 1, timeout defaults to 24 hours.

src/mero_conn.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
-export([increment_counter/7,
3434
mincrement_counter/7,
3535
get/3,
36-
set/5,
36+
set/6,
3737
delete/3,
3838
mdelete/3,
3939
add/5,
@@ -64,10 +64,10 @@ mincrement_counter(Name, Keys, Value, Initial, ExpTime, _Retries, Timeout) ->
6464
end.
6565

6666

67-
set(Name, Key, Value, ExpTime, Timeout) ->
67+
set(Name, Key, Value, ExpTime, Timeout, CAS) ->
6868
TimeLimit = mero_conf:add_now(Timeout),
6969
PoolName = mero_cluster:server(Name, Key),
70-
pool_execute(PoolName, set, [Key, Value, ExpTime, TimeLimit], TimeLimit).
70+
pool_execute(PoolName, set, [Key, Value, ExpTime, TimeLimit, CAS], TimeLimit).
7171

7272

7373
get(Name, Keys, Timeout) ->

src/mero_wrk_tcp_binary.erl

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,18 @@
3535

3636
%%% Start/stop functions
3737
-export([connect/3,
38-
controlling_process/2,
39-
transaction/3,
40-
close/2]).
38+
controlling_process/2,
39+
transaction/3,
40+
close/2]).
4141

4242

4343
-record(client, {socket, pool, event_callback :: module()}).
4444

4545
-define(SOCKET_OPTIONS, [binary,
46-
{packet, raw},
47-
{active, false},
48-
{reuseaddr, true},
49-
{nodelay, true}]).
46+
{packet, raw},
47+
{active, false},
48+
{reuseaddr, true},
49+
{nodelay, true}]).
5050

5151
%%%=============================================================================
5252
%%% External functions
@@ -75,9 +75,9 @@ controlling_process(Client, Pid) ->
7575

7676
transaction(Client, delete, [Key, TimeLimit]) ->
7777
case send_receive(Client, {?MEMCACHE_DELETE, {Key}}, TimeLimit) of
78-
{ok, {<<>>, <<>>}} ->
78+
{ok, #mero_item{key = <<>>, value = <<>>}} ->
7979
{Client, ok};
80-
{ok, {<<>>, undefined}} ->
80+
{ok, #mero_item{key = <<>>, value = undefined}} ->
8181
{Client, {error, not_found}};
8282
{ok, {error, Reason}} ->
8383
{Client, {error, Reason}};
@@ -92,15 +92,18 @@ transaction(Client, increment_counter, [Key, Value, Initial, ExpTime, TimeLimit]
9292
{error, Reason};
9393
{ok, {error, Reason}} ->
9494
{Client, {error, Reason}};
95-
{ok, {<<>>, ActualValue}} ->
95+
{ok, #mero_item{key = <<>>, value = ActualValue}} ->
9696
{Client, {ok, to_integer(ActualValue)}}
9797
end;
9898

9999

100-
transaction(Client, set, [Key, Value, ExpTime, TimeLimit]) ->
101-
case send_receive(Client, {?MEMCACHE_SET, {Key, Value, ExpTime}}, TimeLimit) of
102-
{ok, {<<>>, <<>>}} ->
100+
transaction(Client, set, [Key, Value, ExpTime, TimeLimit, CAS]) ->
101+
case send_receive(Client, {?MEMCACHE_SET, {Key, Value, ExpTime, CAS}}, TimeLimit) of
102+
{ok, #mero_item{key = <<>>, value = <<>>}} ->
103103
{Client, ok};
104+
{ok, #mero_item{key = <<>>, value = undefined}} when CAS /= undefined ->
105+
%% attempt to set a key using CAS, but key wasn't present.
106+
{Client, {error, not_found}};
104107
{ok, {error, Reason}} ->
105108
{Client, {error, Reason}};
106109
{error, Reason} ->
@@ -109,7 +112,7 @@ transaction(Client, set, [Key, Value, ExpTime, TimeLimit]) ->
109112

110113
transaction(Client, add, [Key, Value, ExpTime, TimeLimit]) ->
111114
case send_receive(Client, {?MEMCACHE_ADD, {Key, Value, ExpTime}}, TimeLimit) of
112-
{ok, {<<>>, <<>>}} ->
115+
{ok, #mero_item{key = <<>>, value = <<>>}} ->
113116
{Client, ok};
114117
{ok, {error, Reason}} ->
115118
{Client, {error, Reason}};
@@ -119,7 +122,7 @@ transaction(Client, add, [Key, Value, ExpTime, TimeLimit]) ->
119122

120123
transaction(Client, flush_all, [TimeLimit]) ->
121124
case send_receive(Client, {?MEMCACHE_FLUSH_ALL, {}}, TimeLimit) of
122-
{ok, {<<>>, <<>>}} ->
125+
{ok, #mero_item{key = <<>>, value = <<>>}} ->
123126
{Client, ok};
124127
{ok, {error, Reason}} ->
125128
{Client, {error, Reason}};
@@ -219,9 +222,9 @@ pack({?MEMCACHE_ADD, {Key, Value, ExpTime}}) ->
219222
IntExpTime = value_to_integer(ExpTime),
220223
pack(<<16#DEADBEEF:32, IntExpTime:32>>, ?MEMCACHE_ADD, Key, Value);
221224

222-
pack({?MEMCACHE_SET, {Key, Value, ExpTime}}) ->
225+
pack({?MEMCACHE_SET, {Key, Value, ExpTime, CAS}}) ->
223226
IntExpTime = value_to_integer(ExpTime),
224-
pack(<<16#DEADBEEF:32, IntExpTime:32>>, ?MEMCACHE_SET, Key, Value);
227+
pack(<<16#DEADBEEF:32, IntExpTime:32>>, ?MEMCACHE_SET, Key, Value, CAS);
225228

226229
pack({?MEMCACHE_FLUSH_ALL, {}}) ->
227230
%% Flush inmediately by default
@@ -239,13 +242,28 @@ pack(Extras, Operator, Key) ->
239242
pack(Extras, Operator, Key, <<>>).
240243

241244
pack(Extras, Operator, Key, Value) ->
245+
pack(Extras, Operator, Key, Value, undefined).
246+
247+
pack(Extras, Operator, Key, Value, undefined) ->
248+
pack(Extras, Operator, Key, Value, 16#00);
249+
250+
pack(Extras, Operator, Key, Value, CAS) ->
242251
KeySize = size(Key),
243252
ExtrasSize = size(Extras),
244253
Body = <<Extras:ExtrasSize/binary, Key/binary, Value/binary>>,
245254
BodySize = size(Body),
246-
<<16#80:8, Operator:8, KeySize:16,
247-
ExtrasSize:8, 16#00:8, 16#00:16,
248-
BodySize:32, 16#00:32, 16#00:64, Body:BodySize/binary>>.
255+
<<
256+
16#80:8, % magic (0)
257+
Operator:8, % opcode (1)
258+
KeySize:16, % key length (2,3)
259+
ExtrasSize:8, % extra length (4)
260+
16#00:8, % data type (5)
261+
16#00:16, % reserved (6,7)
262+
BodySize:32, % total body (8-11)
263+
16#00:32, % opaque (12-15)
264+
CAS:64, % CAS (16-23)
265+
Body:BodySize/binary
266+
>>.
249267

250268

251269
send(Client, Data) ->
@@ -258,17 +276,34 @@ send(Client, Data) ->
258276
end.
259277

260278

279+
cas_value(16#00) ->
280+
undefined;
281+
cas_value(undefined) ->
282+
16#00;
283+
cas_value(Value) when is_integer(Value) andalso Value > 0 ->
284+
Value.
285+
286+
261287
receive_response(Client, Op, TimeLimit) ->
262288
case recv_bytes(Client, 24, TimeLimit) of
263-
<<16#81:8, Op:8, KeySize:16, ExtrasSize:8, _DT:8, Status:16,
264-
BodySize:32, _Opq:32, _CAS:64>> ->
289+
<<
290+
16#81:8, % magic (0)
291+
Op:8, % opcode (1)
292+
KeySize:16, % key length (2,3)
293+
ExtrasSize:8, % extra length (4)
294+
_DT:8, % data type (5)
295+
Status:16, % status (6,7)
296+
BodySize:32, % total body (8-11)
297+
_Opq:32, % opaque (12-15)
298+
CAS:64 % CAS (16-23)
299+
>> ->
265300
case recv_bytes(Client, BodySize, TimeLimit) of
266301
<<_Extras:ExtrasSize/binary, Key:KeySize/binary, Value/binary>> ->
267302
case Status of
268303
16#0001 ->
269-
{Key, undefined};
304+
#mero_item{key = Key, cas = cas_value(CAS)};
270305
16#0000 ->
271-
{Key, Value};
306+
#mero_item{key = Key, value = Value, cas = cas_value(CAS)};
272307
16#0002 ->
273308
{error, already_exists};
274309
16#0003 ->
@@ -362,12 +397,22 @@ async_response(Client, Keys, TimeLimit) ->
362397

363398
receive_response(Client, TimeLimit, Keys, Acc) ->
364399
case recv_bytes(Client, 24, TimeLimit) of
365-
<<16#81:8, Op:8, KeySize:16, ExtrasSize:8, _DT:8, Status:16,
366-
BodySize:32, _Opq:32, _CAS:64>> = Data ->
400+
<<
401+
16#81:8, % magic (0)
402+
Op:8, % opcode (1)
403+
KeySize:16, % key length (2,3)
404+
ExtrasSize:8, % extra length (4)
405+
_DT:8, % data type (5)
406+
Status:16, % status (6,7)
407+
BodySize:32, % total body (8-11)
408+
_Opq:32, % opaque (12-15)
409+
CAS:64 % CAS (16-23)
410+
>> = Data ->
367411
case recv_bytes(Client, BodySize, TimeLimit) of
368412
<<_Extras:ExtrasSize/binary, Key:KeySize/binary, ValueReceived/binary>> ->
369413
{Key, Value} = filter_by_status(Status, Op, Key, ValueReceived),
370-
Responses = [{Key, Value} | Acc],
414+
Responses = [#mero_item{key = Key, value = Value, cas = cas_value(CAS)}
415+
| Acc],
371416
NKeys = lists:delete(Key, Keys),
372417
case Op of
373418
%% elasticache does not return the correct Op for

0 commit comments

Comments
 (0)