diff --git a/spec/parser_spec.cr b/spec/parser_spec.cr index 7189fa3..1576b50 100644 --- a/spec/parser_spec.cr +++ b/spec/parser_spec.cr @@ -4,14 +4,14 @@ require "../src/parser" module Redis describe Parser do - it "reads ints" do + it "reads ints as Int64" do [1, 12, 1234, 12345678, 123456789012345678_i64, -1, -12345678, 0].each do |int| io = IO::Memory.new(":#{int}\r\n") Parser.new(io).read.should eq int end end - it "reads simple strings" do + it "reads simple strings as String" do io = IO::Memory.new("+OK\r\n+QUEUED\r\n+.\r\n+\r\n" * 2) parser = Parser.new(io) @@ -25,7 +25,7 @@ module Redis end end - it "reads bulk strings" do + it "reads bulk strings as String" do io = IO::Memory.new("$11\r\nHello world\r\n") Parser.new(io).read.should eq "Hello world" @@ -41,7 +41,7 @@ module Redis Parser.new(io).read.should eq nil end - it "reads arrays" do + it "reads arrays as Array" do io = IO::Memory.new io << "*3\r\n" io << "$4\r\n" # Bulk string, length 4 @@ -62,6 +62,98 @@ module Redis Parser.new(io.rewind).read.should eq ["foo", Error.new("OOPS The thing broke"), 1234] end + it "reads nil" do + io = IO::Memory.new("_\r\n") + + Parser.new(io).read.should eq nil + end + + it "reads doubles as Float64" do + io = IO::Memory.new(",12.34\r\n,56.78\r\n,inf\r\n,-inf\r\n,nan\r\n,1.234567890e4\r\n") + parser = Parser.new(io) + + parser.read.should eq 12.34 + parser.read.should eq 56.78 + parser.read.should eq Float64::INFINITY + parser.read.should eq -Float64::INFINITY + parser.read.as(Float).nan?.should eq true + parser.read.should eq 1.234567890e4 + end + + it "reads booleans as Bool" do + io = IO::Memory.new("#t\r\n#f\r\n") + parser = Parser.new(io) + + parser.read.should eq true + parser.read.should eq false + end + + it "reads simple errors as Redis::Error" do + io = IO::Memory.new("-OMG The thing broke!") + parser = Parser.new(io) + + parser.read.should eq Error.new("OMG The thing broke!") + end + + it "reads blob errors as Redis::Error" do + first_error_msg = "OMG Hello World!\r\nSecond line" + second_error_msg = "OK Computer" + parser = Parser.new(IO::Memory.new(<<-EOF)) + !#{first_error_msg.bytesize}\r + #{first_error_msg}\r + !#{second_error_msg.bytesize}\r + #{second_error_msg}\r + + EOF + + parser.read.should eq Error.new(first_error_msg) + parser.read.should eq Error.new(second_error_msg) + end + + it "reads verbatim strings as String" do + io = IO::Memory.new("=9\r\ntxt:Hello\r\n=10\r\nmkd:World!\r\n") + parser = Parser.new(io) + + parser.read.should eq "Hello" + parser.read.should eq "World!" + end + + it "reads big numbers" do + io = IO::Memory.new("(123456789012345678901234567890\r\n(98765432109876543210987654321\r\n") + parser = Parser.new(io) + + parser.read.should eq BigInt.new("123456789012345678901234567890") + parser.read.should eq BigInt.new("98765432109876543210987654321") + end + + it "reads maps as Hash(Redis::Value, Redis::Value)" do + io = IO::Memory.new("%2\r\n+one\r\n:1\r\n$3\r\ntwo\r\n:2\r\n%1\r\n+foo\r\n+bar\r\n") + parser = Parser.new(io) + + hash = parser.read + hash.should be_a Hash(Redis::Value, Redis::Value) + hash.should eq({ + "one" => 1, + "two" => 2, + }) + parser.read.should eq({"foo" => "bar"}) + end + + it "reads sets as Set(Redis::Value)" do + io = IO::Memory.new("~3\r\n+one\r\n:2\r\n(3\r\n") + parser = Parser.new(io) + + parser.read.should eq Set(Value){"one", 2i64, BigInt.new("3")} + end + + it "reads attributes" do + io = IO::Memory.new("|1\r\n+foo\r\n+bar\r\n|2\r\n+one\r\n:1\r\n+two\r\n:2\r\n") + parser = Parser.new(io) + + parser.read.should eq Attributes.new({"foo" => "bar"} of Value => Value) + parser.read.should eq Attributes.new({"one" => 1i64, "two" => 2i64} of Value => Value) + end + it "can read without failing if the IO is closed" do reader, writer = IO.pipe begin diff --git a/src/client.cr b/src/client.cr index 29134d4..2104dc7 100644 --- a/src/client.cr +++ b/src/client.cr @@ -26,6 +26,7 @@ module Redis getter uri : URI @pool : DB::Pool(Connection) + @on_attributes = Proc(Attributes, Nil).new { } def self.from_env(env_var) new(URI.parse(ENV[env_var])) @@ -140,6 +141,15 @@ module Redis checkout(&.multi { |txn| yield txn }) end + # Run the given block when any connection managed by this client receives a + # `Redis::Attributes`. + def on_attributes(&@on_attributes : Attributes -> Nil) : self + @pool.each_resource do |connection| + connection.on_attributes(&on_attributes) + end + self + end + # Watch the given `keys` for changes when you need to fetch them for update # in a transaction and yield the connection with that watch active. This # allows for optimistic updates within the transaction, returning `nil` from diff --git a/src/connection.cr b/src/connection.cr index 7461917..3f5c348 100644 --- a/src/connection.cr +++ b/src/connection.cr @@ -19,6 +19,7 @@ module Redis include Commands::Immediate @socket : TCPSocket | OpenSSL::SSL::Socket::Client + @on_attributes = Proc(Attributes, Nil).new { } protected getter parser : Parser getter? closed = false getter log @@ -56,8 +57,9 @@ module Redis pipeline do |redis| # Authentication if (username = uri.user) && (password = uri.password) - redis.run({"auth", username, password}) + redis.run({"hello", "3", "auth", username, password}) elsif password = uri.password + redis.run({"hello", "3"}) redis.run({"auth", password}) end @@ -359,24 +361,34 @@ module Redis @socket.flush end + # :nodoc: # Read the next value from the server - def read - case value = @parser.read - when Error - raise value - else - value + def read : Value + loop do + case value = @parser.read + when Error + raise value + when Attributes + @on_attributes.call value + else + return value + end end end + # :nodoc: # Read the next value from the server, returning `nil` if the connection is # closed. def read? - case value = @parser.read? - when Error - raise value - else - value + loop do + case value = @parser.read? + when Error + raise value + when Attributes + @on_attributes.call value + else + return value + end end end diff --git a/src/parser.cr b/src/parser.cr index 47a8ad8..8725486 100644 --- a/src/parser.cr +++ b/src/parser.cr @@ -1,3 +1,5 @@ +require "big/big_int" + require "./value" require "./errors" @@ -19,7 +21,7 @@ module Redis # # Parser.new(io).read # => "foo" # ``` - def read : Value + def read read { raise IO::Error.new("Connection closed") } end @@ -33,12 +35,32 @@ module Redis case byte_marker = @io.read_byte when ':' parse_int.tap { @io.skip 2 } + when '(' + BigInt.new(@io.read_line) when '*' length = parse_int @io.skip 2 if length >= 0 - Array(Value).new(length) { read } + Array(Value).new(length) { read.as(Value) } end + when '%' + size = parse_int + @io.skip 2 + hash = Hash(Value, Value).new(initial_capacity: size) + size.times { hash[read.as(Value)] = read.as(Value) } + hash + when '~' + size = parse_int + @io.skip 2 + set = Set(Value).new(size) + size.times { set << read.as(Value) } + set + when '|' + size = parse_int + @io.skip 2 + hash = Hash(Value, Value).new(initial_capacity: size) + size.times { hash[read.as(Value)] = read.as(Value) } + Attributes.new hash when '$' length = parse_int @io.skip 2 @@ -47,6 +69,13 @@ module Redis @io.skip 2 # Skip CRLF value end + when '=' + length = parse_int + @io.skip 2 + @io.skip 4 + value = @io.read_string length - 4 + @io.skip 2 + value when '+' # Most of the time, RESP simple strings are just "OK", so we can # optimize for that case to avoid heap allocations. If it is *not* the @@ -66,6 +95,24 @@ module Redis str << @io.read_line end.chomp end + when '_' + @io.skip 2 + nil + when ',' + @io.read_line.to_f + when '#' + value = @io.read_char == 't' + @io.skip 2 + value + when '!' + length = parse_int + @io.skip 2 + full_error_message = @io.read_string length + @io.skip 2 + if separator_index = full_error_message.index(' ') + type = full_error_message[0...separator_index] + end + ERROR_MAP[type].new(full_error_message) when '-' type, message = @io.read_line.split(' ', 2) ERROR_MAP[type].new("#{type} #{message}") diff --git a/src/pipeline.cr b/src/pipeline.cr index 9d991e6..c69041d 100644 --- a/src/pipeline.cr +++ b/src/pipeline.cr @@ -8,11 +8,12 @@ module Redis class Pipeline include Commands include Commands::Deferred + getter connection : Connection @futures = [] of Future # Wraps a connection so that our `run` and `commit` methods can execute against it. - def initialize(@connection : Connection) + def initialize(@connection) end # The `run` method is required by the `Commands` mixin. When you run a Redis @@ -28,7 +29,11 @@ module Redis # and resolve all `Redis::Future`s with them in the order they were sent. def commit @futures.map_with_index do |future, index| - future.resolve(@connection.parser.read) + while (result = @connection.parser.read).is_a? Attributes + @connection.@on_attributes.call result + end + + future.resolve(result) rescue ex raise ResolutionError.new("Failed reading pipeline item #{index}: #{ex.message.inspect}", cause: ex) end diff --git a/src/value.cr b/src/value.cr index acfa792..192aed1 100644 --- a/src/value.cr +++ b/src/value.cr @@ -1,7 +1,43 @@ require "./errors" +require "big/big_int" module Redis # Values consumed and emitted by Redis can be strings, 64-bit integers, `nil`, # or an array of any of these types. - alias Value = String | Int64 | Nil | Error | Array(Value) + alias Value = String | + Int64 | + BigInt | + Float64 | + Bool | + Nil | + Error | + Hash(Value, Value) | + Set(Value) | + Array(Value) + + struct Attributes + include Enumerable({Value, Value}) + @hash : Hash(Value, Value) + + def initialize(@hash = {} of Value => Value) + end + + def [](key : String) : Value + @hash[key] + end + + def []?(key : String) : Value + @hash[key]? + end + + def each(&) + @hash.each do |key, value| + yield({key, value}) + end + end + + def merge!(other : self) + @hash.merge other.@hash + end + end end