-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathclient.cr
More file actions
205 lines (180 loc) · 7.07 KB
/
Copy pathclient.cr
File metadata and controls
205 lines (180 loc) · 7.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
require "db/pool"
require "log"
require "./connection"
require "./commands"
require "./commands/immediate"
require "./log"
module Redis
# The Redis client is the expected entrypoint for this shard. By default, it will connect to localhost:6379, but you can also supply a `URI` to connect to an arbitrary Redis server. SSL, password authentication, and DB selection are all supported.
#
# ```
# # Connects to localhost:6379
# redis = Redis::Client.new
#
# # Connects to a server at "redis.example.com" on port 6000 over a TLS
# # connection, authenticates with the password "password", and uses DB 3
# redis = Redis::Client.new(URI.parse("rediss://:password@redis.example.com:6000/3"))
#
# # Connects to a server at the URL in `ENV["REDIS_URL"]`
# redis = Redis::Client.from_env("REDIS_URL")
# ```
class Client
include Commands
include Commands::Immediate
getter uri : URI
@pool : DB::Pool(Connection)
@on_attributes = Proc(Attributes, Nil).new { }
def self.from_env(env_var)
new(URI.parse(ENV[env_var]))
end
# The client holds a pool of connections that expands and contracts as
# needed.
def initialize(@uri = URI.parse(ENV.fetch("REDIS_URL", "redis:///")), @log = Log)
# defaults as per https://github.com/crystal-lang/crystal-db/blob/v0.11.0/src/db/pool.cr
initial_pool_size = uri.query_params.fetch("initial_pool_size", 1).to_i
max_pool_size = uri.query_params.fetch("max_pool_size", 0).to_i
checkout_timeout = uri.query_params.fetch("checkout_timeout", 5.0).to_f
retry_attempts = uri.query_params.fetch("retry_attempts", 1).to_i
retry_delay = uri.query_params.fetch("retry_delay", 0.2).to_f
# default is 1, but we want to be able to use 25 minimum
max_idle_pool_size = uri.query_params.fetch("max_idle_pool_size", 25).to_i
@pool = DB::Pool.new(DB::Pool::Options.new(
initial_pool_size: initial_pool_size,
max_pool_size: max_pool_size,
max_idle_pool_size: max_idle_pool_size,
checkout_timeout: checkout_timeout,
retry_attempts: retry_attempts,
retry_delay: retry_delay,
)) do
Connection.new(uri, log: log)
end
end
def scan_each(match pattern : String? = nil, count : String | Int | Nil = nil, type : String? = nil, &) : Nil
checkout(&.scan_each(match: pattern, count: count, type: type) { |key| yield key })
end
def hscan_each(key : String, *, match pattern : String? = nil, count : String | Int | Nil = nil, &) : Nil
checkout(&.hscan_each(key: key, match: pattern, count: count) { |field, value| yield field, value })
end
def sscan_each(key : String, *, match pattern : String? = nil, count : String | Int | Nil = nil, &) : Nil
checkout(&.sscan_each(key: key, match: pattern, count: count) { |member| yield member })
end
def sscan_each(key : String, *, match pattern : String? = nil, count : String | Int | Nil = nil)
SScanIterator.new(self, key, match: pattern, count: count.try(&.to_s))
end
class SScanIterator
include Iterator(String)
getter redis : Redis::Client
getter key : String
getter pattern : String?
getter count : String?
getter cursor = "0"
getter keys : Array(Value)?
getter keys_index = -1
@fetched_last_batch = false
def initialize(@redis, @key, match @pattern, @count)
end
def next : String | Stop
if keys = self.keys
if key = keys[keys_index]?
@keys_index += 1
key.as(String)
elsif @fetched_last_batch
return stop
else
fetch_batch
self.next
end
else
fetch_batch
self.next
end
end
def fetch_batch
cursor, keys = redis.sscan key,
cursor: cursor,
match: pattern,
count: count
if cursor == "0"
@fetched_last_batch = true
end
@keys_index = 0
@cursor = cursor.as(String)
@keys = keys.as(Array)
end
end
def zscan_each(key : String, *, match pattern : String? = nil, count : String | Int | Nil = nil, &) : Nil
checkout(&.zscan_each(key: key, match: pattern, count: count) { |member, score| yield member, score })
end
# All Redis commands invoked on the client check out a connection from the
# connection pool, invoke the command on that connection, and then check the
# connection back into the pool.
#
# ```
# redis = Redis::Client.new
# ```
def run(command)
checkout(&.run(command))
end
def pipeline(&)
checkout(&.pipeline { |pipe| yield pipe })
end
def multi(&)
checkout(&.multi { |txn| yield txn })
end
# Run the given block when any connection managed by this client receives a
# `Redis::Attributes`.
def on_attributes(&@on_attributes : Attributes -> Nil) : self
@pool.each_resource do |connection|
connection.on_attributes(&on_attributes)
end
self
end
# Watch the given `keys` for changes when you need to fetch them for update
# in a transaction and yield the connection with that watch active. This
# allows for optimistic updates within the transaction, returning `nil` from
# `Transaction#exec` if any of the keys are modified before the transaction
# completes.
#
# ```
# # Begin watching the `session:123` key, yields the connection that's
# # watching it
# redis.watch "session:123" do |conn|
# session = Session.from_json(conn.get!("session:123"))
# session.user_id = user.id
#
# # Begin a new tra
# conn.multi do |txn|
# txn.set "session:123", session.to_json
# end
# end
# ```
#
# NOTE: This does not *prevent* concurrent updates the way an RDBMS like Postgres does. Redis has no way to prevent that when performing more than a single command. However, this pattern allows you to _detect_ a concurrent update (the `multi` block returns `nil`), so you'll need to design your interactions with the Redis server around this and, if necessary, retry the transaction according to your application's needs.
# IMPORTANT: Use this sparingly. Whenever feasible, instead of a fetch/mutate/save cycle, update keys atomically in Redis. For example, if you're using a JSON field, set properties directly with `JSON#set` or increment them with `JSON#numincrby`. Redis is designed specifically to work that way.
def watch(*keys : String, &)
checkout do |connection|
connection.watch(*keys)
yield connection
end
end
def subscribe(*channels, &)
checkout(&.subscribe(*channels) { |subscription, conn| yield subscription, conn })
end
def psubscribe(*channels, &)
checkout(&.psubscribe(*channels) { |subscription, conn| yield subscription, conn })
end
def close
@pool.close
end
private def checkout(&)
@pool.retry do
@pool.checkout do |connection|
yield connection
rescue ex : IO::Error
connection.close
raise ex
end
end
end
end
end