Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 96 additions & 4 deletions spec/parser_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ require "../src/parser"

module Redis
describe Parser do
it "reads ints" do
it "reads ints as Int64" do
[1, 12, 1234, 12345678, 123456789012345678_i64, -1, -12345678, 0].each do |int|
io = IO::Memory.new(":#{int}\r\n")
Parser.new(io).read.should eq int
end
end

it "reads simple strings" do
it "reads simple strings as String" do
io = IO::Memory.new("+OK\r\n+QUEUED\r\n+.\r\n+\r\n" * 2)
parser = Parser.new(io)

Expand All @@ -25,7 +25,7 @@ module Redis
end
end

it "reads bulk strings" do
it "reads bulk strings as String" do
io = IO::Memory.new("$11\r\nHello world\r\n")
Parser.new(io).read.should eq "Hello world"

Expand All @@ -41,7 +41,7 @@ module Redis
Parser.new(io).read.should eq nil
end

it "reads arrays" do
it "reads arrays as Array" do
io = IO::Memory.new
io << "*3\r\n"
io << "$4\r\n" # Bulk string, length 4
Expand All @@ -62,6 +62,98 @@ module Redis
Parser.new(io.rewind).read.should eq ["foo", Error.new("OOPS The thing broke"), 1234]
end

it "reads nil" do
io = IO::Memory.new("_\r\n")

Parser.new(io).read.should eq nil
end

it "reads doubles as Float64" do
io = IO::Memory.new(",12.34\r\n,56.78\r\n,inf\r\n,-inf\r\n,nan\r\n,1.234567890e4\r\n")
parser = Parser.new(io)

parser.read.should eq 12.34
parser.read.should eq 56.78
parser.read.should eq Float64::INFINITY
parser.read.should eq -Float64::INFINITY
parser.read.as(Float).nan?.should eq true
parser.read.should eq 1.234567890e4
end

it "reads booleans as Bool" do
io = IO::Memory.new("#t\r\n#f\r\n")
parser = Parser.new(io)

parser.read.should eq true
parser.read.should eq false
end

it "reads simple errors as Redis::Error" do
io = IO::Memory.new("-OMG The thing broke!")
parser = Parser.new(io)

parser.read.should eq Error.new("OMG The thing broke!")
end

it "reads blob errors as Redis::Error" do
first_error_msg = "OMG Hello World!\r\nSecond line"
second_error_msg = "OK Computer"
parser = Parser.new(IO::Memory.new(<<-EOF))
!#{first_error_msg.bytesize}\r
#{first_error_msg}\r
!#{second_error_msg.bytesize}\r
#{second_error_msg}\r

EOF

parser.read.should eq Error.new(first_error_msg)
parser.read.should eq Error.new(second_error_msg)
end

it "reads verbatim strings as String" do
io = IO::Memory.new("=9\r\ntxt:Hello\r\n=10\r\nmkd:World!\r\n")
parser = Parser.new(io)

parser.read.should eq "Hello"
parser.read.should eq "World!"
end

it "reads big numbers" do
io = IO::Memory.new("(123456789012345678901234567890\r\n(98765432109876543210987654321\r\n")
parser = Parser.new(io)

parser.read.should eq BigInt.new("123456789012345678901234567890")
parser.read.should eq BigInt.new("98765432109876543210987654321")
end

it "reads maps as Hash(Redis::Value, Redis::Value)" do
io = IO::Memory.new("%2\r\n+one\r\n:1\r\n$3\r\ntwo\r\n:2\r\n%1\r\n+foo\r\n+bar\r\n")
parser = Parser.new(io)

hash = parser.read
hash.should be_a Hash(Redis::Value, Redis::Value)
hash.should eq({
"one" => 1,
"two" => 2,
})
parser.read.should eq({"foo" => "bar"})
end

it "reads sets as Set(Redis::Value)" do
io = IO::Memory.new("~3\r\n+one\r\n:2\r\n(3\r\n")
parser = Parser.new(io)

parser.read.should eq Set(Value){"one", 2i64, BigInt.new("3")}
end

it "reads attributes" do
io = IO::Memory.new("|1\r\n+foo\r\n+bar\r\n|2\r\n+one\r\n:1\r\n+two\r\n:2\r\n")
parser = Parser.new(io)

parser.read.should eq Attributes.new({"foo" => "bar"} of Value => Value)
parser.read.should eq Attributes.new({"one" => 1i64, "two" => 2i64} of Value => Value)
end

it "can read without failing if the IO is closed" do
reader, writer = IO.pipe
begin
Expand Down
10 changes: 10 additions & 0 deletions src/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Redis

getter uri : URI
@pool : DB::Pool(Connection)
@on_attributes = Proc(Attributes, Nil).new { }

def self.from_env(env_var)
new(URI.parse(ENV[env_var]))
Expand Down Expand Up @@ -140,6 +141,15 @@ module Redis
checkout(&.multi { |txn| yield txn })
end

# Run the given block when any connection managed by this client receives a
# `Redis::Attributes`.
def on_attributes(&@on_attributes : Attributes -> Nil) : self
@pool.each_resource do |connection|
connection.on_attributes(&on_attributes)
end
self
end

# Watch the given `keys` for changes when you need to fetch them for update
# in a transaction and yield the connection with that watch active. This
# allows for optimistic updates within the transaction, returning `nil` from
Expand Down
36 changes: 24 additions & 12 deletions src/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Redis
include Commands::Immediate

@socket : TCPSocket | OpenSSL::SSL::Socket::Client
@on_attributes = Proc(Attributes, Nil).new { }
protected getter parser : Parser
getter? closed = false
getter log
Expand Down Expand Up @@ -56,8 +57,9 @@ module Redis
pipeline do |redis|
# Authentication
if (username = uri.user) && (password = uri.password)
redis.run({"auth", username, password})
redis.run({"hello", "3", "auth", username, password})
elsif password = uri.password
redis.run({"hello", "3"})
redis.run({"auth", password})
end

Expand Down Expand Up @@ -359,24 +361,34 @@ module Redis
@socket.flush
end

# :nodoc:
# Read the next value from the server
def read
case value = @parser.read
when Error
raise value
else
value
def read : Value
loop do
case value = @parser.read
when Error
raise value
when Attributes
@on_attributes.call value
else
return value
end
end
end

# :nodoc:
# Read the next value from the server, returning `nil` if the connection is
# closed.
def read?
case value = @parser.read?
when Error
raise value
else
value
loop do
case value = @parser.read?
when Error
raise value
when Attributes
@on_attributes.call value
else
return value
end
end
end

Expand Down
51 changes: 49 additions & 2 deletions src/parser.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "big/big_int"

require "./value"
require "./errors"

Expand All @@ -19,7 +21,7 @@ module Redis
#
# Parser.new(io).read # => "foo"
# ```
def read : Value
def read
read { raise IO::Error.new("Connection closed") }
end

Expand All @@ -33,12 +35,32 @@ module Redis
case byte_marker = @io.read_byte
when ':'
parse_int.tap { @io.skip 2 }
when '('
BigInt.new(@io.read_line)
when '*'
length = parse_int
@io.skip 2
if length >= 0
Array(Value).new(length) { read }
Array(Value).new(length) { read.as(Value) }
end
when '%'
size = parse_int
@io.skip 2
hash = Hash(Value, Value).new(initial_capacity: size)
size.times { hash[read.as(Value)] = read.as(Value) }
hash
when '~'
size = parse_int
@io.skip 2
set = Set(Value).new(size)
size.times { set << read.as(Value) }
set
when '|'
size = parse_int
@io.skip 2
hash = Hash(Value, Value).new(initial_capacity: size)
size.times { hash[read.as(Value)] = read.as(Value) }
Attributes.new hash
when '$'
length = parse_int
@io.skip 2
Expand All @@ -47,6 +69,13 @@ module Redis
@io.skip 2 # Skip CRLF
value
end
when '='
length = parse_int
@io.skip 2
@io.skip 4
value = @io.read_string length - 4
@io.skip 2
value
when '+'
# Most of the time, RESP simple strings are just "OK", so we can
# optimize for that case to avoid heap allocations. If it is *not* the
Expand All @@ -66,6 +95,24 @@ module Redis
str << @io.read_line
end.chomp
end
when '_'
@io.skip 2
nil
when ','
@io.read_line.to_f
when '#'
value = @io.read_char == 't'
@io.skip 2
value
when '!'
length = parse_int
@io.skip 2
full_error_message = @io.read_string length
@io.skip 2
if separator_index = full_error_message.index(' ')
type = full_error_message[0...separator_index]
end
ERROR_MAP[type].new(full_error_message)
when '-'
type, message = @io.read_line.split(' ', 2)
ERROR_MAP[type].new("#{type} #{message}")
Expand Down
9 changes: 7 additions & 2 deletions src/pipeline.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ module Redis
class Pipeline
include Commands
include Commands::Deferred
getter connection : Connection

@futures = [] of Future

# Wraps a connection so that our `run` and `commit` methods can execute against it.
def initialize(@connection : Connection)
def initialize(@connection)
end

# The `run` method is required by the `Commands` mixin. When you run a Redis
Expand All @@ -28,7 +29,11 @@ module Redis
# and resolve all `Redis::Future`s with them in the order they were sent.
def commit
@futures.map_with_index do |future, index|
future.resolve(@connection.parser.read)
while (result = @connection.parser.read).is_a? Attributes
@connection.@on_attributes.call result
end

future.resolve(result)
rescue ex
raise ResolutionError.new("Failed reading pipeline item #{index}: #{ex.message.inspect}", cause: ex)
end
Expand Down
38 changes: 37 additions & 1 deletion src/value.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
require "./errors"
require "big/big_int"

module Redis
# Values consumed and emitted by Redis can be strings, 64-bit integers, `nil`,
# or an array of any of these types.
alias Value = String | Int64 | Nil | Error | Array(Value)
alias Value = String |
Int64 |
BigInt |
Float64 |
Bool |
Nil |
Error |
Hash(Value, Value) |
Set(Value) |
Array(Value)

struct Attributes
include Enumerable({Value, Value})
@hash : Hash(Value, Value)

def initialize(@hash = {} of Value => Value)
end

def [](key : String) : Value
@hash[key]
end

def []?(key : String) : Value
@hash[key]?
end

def each(&)
@hash.each do |key, value|
yield({key, value})
end
end

def merge!(other : self)
@hash.merge other.@hash
end
end
end