From dd3ed57382256b2de5f1bbbcfc2643d14bda9339 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 7 Jan 2026 22:46:56 -0500 Subject: [PATCH 1/6] Add support for RESP3 types --- spec/parser_spec.cr | 86 ++++++++++++++++++++++++++++++++++++++++++--- src/parser.cr | 44 +++++++++++++++++++++++ src/value.cr | 34 +++++++++++++++++- 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/spec/parser_spec.cr b/spec/parser_spec.cr index 7189fa3..dddc8f9 100644 --- a/spec/parser_spec.cr +++ b/spec/parser_spec.cr @@ -4,14 +4,14 @@ require "../src/parser" module Redis describe Parser do - it "reads ints" do + it "reads ints as Int64" do [1, 12, 1234, 12345678, 123456789012345678_i64, -1, -12345678, 0].each do |int| io = IO::Memory.new(":#{int}\r\n") Parser.new(io).read.should eq int end end - it "reads simple strings" do + it "reads simple strings as String" do io = IO::Memory.new("+OK\r\n+QUEUED\r\n+.\r\n+\r\n" * 2) parser = Parser.new(io) @@ -25,7 +25,7 @@ module Redis end end - it "reads bulk strings" do + it "reads bulk strings as String" do io = IO::Memory.new("$11\r\nHello world\r\n") Parser.new(io).read.should eq "Hello world" @@ -41,7 +41,7 @@ module Redis Parser.new(io).read.should eq nil end - it "reads arrays" do + it "reads arrays as Array" do io = IO::Memory.new io << "*3\r\n" io << "$4\r\n" # Bulk string, length 4 @@ -62,6 +62,84 @@ module Redis Parser.new(io.rewind).read.should eq ["foo", Error.new("OOPS The thing broke"), 1234] end + it "reads nil" do + io = IO::Memory.new("_\r\n") + + Parser.new(io).read.should eq nil + end + + it "reads doubles as Float64" do + io = IO::Memory.new(",12.34\r\n,56.78\r\n,inf\r\n,-inf\r\n,nan\r\n,1.234567890e4\r\n") + parser = Parser.new(io) + + parser.read.should eq 12.34 + parser.read.should eq 56.78 + parser.read.should eq Float64::INFINITY + parser.read.should eq -Float64::INFINITY + parser.read.as(Float).nan?.should eq true + parser.read.should eq 1.234567890e4 + end + + it "reads booleans as Bool" do + io = IO::Memory.new("#t\r\n#f\r\n") + parser = Parser.new(io) + + parser.read.should eq true + parser.read.should eq false + end + + it "reads blob errors as Redis::Error" do + io = IO::Memory.new("!15\r\nOMG Hello World!\r\n!11\r\nOK Computer\r\n") + parser = Parser.new(io) + + parser.read.should eq Error.new("OMG Hello World!") + parser.read.should eq Error.new("OK Computer") + end + + it "reads verbatim strings as String" do + io = IO::Memory.new("=9\r\ntxt:Hello\r\n=10\r\nmkd:World!\r\n") + parser = Parser.new(io) + + parser.read.should eq "Hello" + parser.read.should eq "World!" + end + + it "reads big numbers" do + io = IO::Memory.new("(123456789012345678901234567890\r\n(98765432109876543210987654321\r\n") + parser = Parser.new(io) + + parser.read.should eq BigInt.new("123456789012345678901234567890") + parser.read.should eq BigInt.new("98765432109876543210987654321") + end + + it "reads maps as Hash(Redis::Value, Redis::Value)" do + io = IO::Memory.new("%2\r\n+one\r\n:1\r\n$3\r\ntwo\r\n:2\r\n%1\r\n+foo\r\n+bar\r\n") + parser = Parser.new(io) + + parser.read.should eq({ + "one" => 1, + "two" => 2, + }) + parser.read.should eq({"foo" => "bar"}) + end + + it "reads sets as Set(Redis::Value)" do + io = IO::Memory.new("~3\r\n+one\r\n:2\r\n(3\r\n") + parser = Parser.new(io) + + parser.read.should eq Set(Value){"one", 2i64, BigInt.new("3")} + end + + it "reads attributes" do + io = IO::Memory.new("|1\r\n+foo\r\n+bar\r\n|2\r\n+one\r\n:1\r\n+two\r\n:2\r\n") + parser = Parser.new(io) + + parser.read.should eq Attributes.new({"foo" => "bar"} of Value => Value) + parser.read.should eq Attributes.new({"one" => 1i64, "two" => 2i64} of Value => Value) + end + + + it "can read without failing if the IO is closed" do reader, writer = IO.pipe begin diff --git a/src/parser.cr b/src/parser.cr index 47a8ad8..f6008d7 100644 --- a/src/parser.cr +++ b/src/parser.cr @@ -1,3 +1,5 @@ +require "big/big_int" + require "./value" require "./errors" @@ -33,12 +35,32 @@ module Redis case byte_marker = @io.read_byte when ':' parse_int.tap { @io.skip 2 } + when '(' + BigInt.new(@io.read_line) when '*' length = parse_int @io.skip 2 if length >= 0 Array(Value).new(length) { read } end + when '%' + size = parse_int + @io.skip 2 + hash = Hash(Value, Value).new(initial_capacity: size) + size.times { hash[read] = read } + hash + when '~' + size = parse_int + @io.skip 2 + set = Set(Value).new(size) + size.times { set << read } + set + when '|' + size = parse_int + @io.skip 2 + hash = Hash(Value, Value).new(initial_capacity: size) + size.times { hash[read] = read } + Attributes.new hash when '$' length = parse_int @io.skip 2 @@ -47,6 +69,13 @@ module Redis @io.skip 2 # Skip CRLF value end + when '=' + length = parse_int + @io.skip 2 + @io.skip 4 + value = @io.read_string length - 4 + @io.skip 2 + value when '+' # Most of the time, RESP simple strings are just "OK", so we can # optimize for that case to avoid heap allocations. If it is *not* the @@ -66,6 +95,21 @@ module Redis str << @io.read_line end.chomp end + when '_' + @io.skip 2 + nil + when ',' + @io.read_line.to_f + when '#' + value = @io.read_char == 't' + @io.skip 2 + value + when '!' + length = parse_int + @io.skip 2 + type = @io.read_line(' ', chomp: true) + message = @io.read_line + ERROR_MAP[type].new("#{type} #{message}") when '-' type, message = @io.read_line.split(' ', 2) ERROR_MAP[type].new("#{type} #{message}") diff --git a/src/value.cr b/src/value.cr index acfa792..9306d27 100644 --- a/src/value.cr +++ b/src/value.cr @@ -3,5 +3,37 @@ require "./errors" module Redis # Values consumed and emitted by Redis can be strings, 64-bit integers, `nil`, # or an array of any of these types. - alias Value = String | Int64 | Nil | Error | Array(Value) + alias Value = String | + Int64 | + BigInt | + Float64 | + Bool | + Nil | + Error | + Hash(Value, Value) | + Set(Value) | + Attributes | + Array(Value) + + struct Attributes + include Enumerable({Value, Value}) + @hash : Hash(Value, Value) + + def initialize(@hash) + end + + def [](key : String) : Value + @hash[key] + end + + def []?(key : String) : Value + @hash[key]? + end + + def each + @hash.each do |key, value| + yield({key, value}) + end + end + end end From aaf569a90ff48b49fe40a3f462ae7fa70bb586c1 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 3 Mar 2026 00:54:02 -0500 Subject: [PATCH 2/6] Add spec for `INFO` --- spec/redis_spec.cr | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spec/redis_spec.cr b/spec/redis_spec.cr index d3cfa31..ad99a1a 100644 --- a/spec/redis_spec.cr +++ b/spec/redis_spec.cr @@ -555,6 +555,11 @@ describe Redis::Client do end end end + + it "gets info about the Redis server" do + redis.info.should be_a Hash(String, String) + redis.info("modules").should be_a String + end end module RedisSpec From 0d3458ddd234fbf686a992a79e3ec6c8689f6692 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 3 Mar 2026 17:22:01 -0500 Subject: [PATCH 3/6] Load dependency correctly --- src/value.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/value.cr b/src/value.cr index 9306d27..d563678 100644 --- a/src/value.cr +++ b/src/value.cr @@ -1,4 +1,5 @@ require "./errors" +require "big/big_int" module Redis # Values consumed and emitted by Redis can be strings, 64-bit integers, `nil`, From a443fb578ff74e4486191430adae02fcb4fb2210 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 3 Mar 2026 17:25:52 -0500 Subject: [PATCH 4/6] Remove Attributes from Value union type Applications shouldn't be expecting an `Attributes` as a result from running Redis commands. --- spec/parser_spec.cr | 28 +++++++++++++++++++++------- src/parser.cr | 19 +++++++++++-------- src/value.cr | 3 +-- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/spec/parser_spec.cr b/spec/parser_spec.cr index dddc8f9..1576b50 100644 --- a/spec/parser_spec.cr +++ b/spec/parser_spec.cr @@ -88,12 +88,26 @@ module Redis parser.read.should eq false end - it "reads blob errors as Redis::Error" do - io = IO::Memory.new("!15\r\nOMG Hello World!\r\n!11\r\nOK Computer\r\n") + it "reads simple errors as Redis::Error" do + io = IO::Memory.new("-OMG The thing broke!") parser = Parser.new(io) - parser.read.should eq Error.new("OMG Hello World!") - parser.read.should eq Error.new("OK Computer") + parser.read.should eq Error.new("OMG The thing broke!") + end + + it "reads blob errors as Redis::Error" do + first_error_msg = "OMG Hello World!\r\nSecond line" + second_error_msg = "OK Computer" + parser = Parser.new(IO::Memory.new(<<-EOF)) + !#{first_error_msg.bytesize}\r + #{first_error_msg}\r + !#{second_error_msg.bytesize}\r + #{second_error_msg}\r + + EOF + + parser.read.should eq Error.new(first_error_msg) + parser.read.should eq Error.new(second_error_msg) end it "reads verbatim strings as String" do @@ -116,7 +130,9 @@ module Redis io = IO::Memory.new("%2\r\n+one\r\n:1\r\n$3\r\ntwo\r\n:2\r\n%1\r\n+foo\r\n+bar\r\n") parser = Parser.new(io) - parser.read.should eq({ + hash = parser.read + hash.should be_a Hash(Redis::Value, Redis::Value) + hash.should eq({ "one" => 1, "two" => 2, }) @@ -138,8 +154,6 @@ module Redis parser.read.should eq Attributes.new({"one" => 1i64, "two" => 2i64} of Value => Value) end - - it "can read without failing if the IO is closed" do reader, writer = IO.pipe begin diff --git a/src/parser.cr b/src/parser.cr index f6008d7..8725486 100644 --- a/src/parser.cr +++ b/src/parser.cr @@ -21,7 +21,7 @@ module Redis # # Parser.new(io).read # => "foo" # ``` - def read : Value + def read read { raise IO::Error.new("Connection closed") } end @@ -41,25 +41,25 @@ module Redis length = parse_int @io.skip 2 if length >= 0 - Array(Value).new(length) { read } + Array(Value).new(length) { read.as(Value) } end when '%' size = parse_int @io.skip 2 hash = Hash(Value, Value).new(initial_capacity: size) - size.times { hash[read] = read } + size.times { hash[read.as(Value)] = read.as(Value) } hash when '~' size = parse_int @io.skip 2 set = Set(Value).new(size) - size.times { set << read } + size.times { set << read.as(Value) } set when '|' size = parse_int @io.skip 2 hash = Hash(Value, Value).new(initial_capacity: size) - size.times { hash[read] = read } + size.times { hash[read.as(Value)] = read.as(Value) } Attributes.new hash when '$' length = parse_int @@ -107,9 +107,12 @@ module Redis when '!' length = parse_int @io.skip 2 - type = @io.read_line(' ', chomp: true) - message = @io.read_line - ERROR_MAP[type].new("#{type} #{message}") + full_error_message = @io.read_string length + @io.skip 2 + if separator_index = full_error_message.index(' ') + type = full_error_message[0...separator_index] + end + ERROR_MAP[type].new(full_error_message) when '-' type, message = @io.read_line.split(' ', 2) ERROR_MAP[type].new("#{type} #{message}") diff --git a/src/value.cr b/src/value.cr index d563678..c14e22c 100644 --- a/src/value.cr +++ b/src/value.cr @@ -13,7 +13,6 @@ module Redis Error | Hash(Value, Value) | Set(Value) | - Attributes | Array(Value) struct Attributes @@ -31,7 +30,7 @@ module Redis @hash[key]? end - def each + def each(&) @hash.each do |key, value| yield({key, value}) end From 921617851ec589b1dd6bab6d7132fb7a214852a2 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 3 Mar 2026 17:31:19 -0500 Subject: [PATCH 5/6] Handle attributes on the connection --- src/connection.cr | 5 +++++ src/pipeline.cr | 2 +- src/value.cr | 6 +++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/connection.cr b/src/connection.cr index 37ab619..5b06b9f 100644 --- a/src/connection.cr +++ b/src/connection.cr @@ -21,6 +21,7 @@ module Redis @socket : TCPSocket | OpenSSL::SSL::Socket::Client protected getter parser : Parser getter? closed = false + getter attributes : Attributes = Attributes.new getter log # We receive all connection information in the URI. @@ -351,16 +352,20 @@ module Redis @socket.flush end + # :nodoc: # Read the next value from the server def read case value = @parser.read when Error raise value + when Attributes + attributes.merge! value else value end end + # :nodoc: # Read the next value from the server, returning `nil` if the connection is # closed. def read? diff --git a/src/pipeline.cr b/src/pipeline.cr index 9d991e6..af3046a 100644 --- a/src/pipeline.cr +++ b/src/pipeline.cr @@ -28,7 +28,7 @@ module Redis # and resolve all `Redis::Future`s with them in the order they were sent. def commit @futures.map_with_index do |future, index| - future.resolve(@connection.parser.read) + future.resolve(@connection.read) rescue ex raise ResolutionError.new("Failed reading pipeline item #{index}: #{ex.message.inspect}", cause: ex) end diff --git a/src/value.cr b/src/value.cr index c14e22c..192aed1 100644 --- a/src/value.cr +++ b/src/value.cr @@ -19,7 +19,7 @@ module Redis include Enumerable({Value, Value}) @hash : Hash(Value, Value) - def initialize(@hash) + def initialize(@hash = {} of Value => Value) end def [](key : String) : Value @@ -35,5 +35,9 @@ module Redis yield({key, value}) end end + + def merge!(other : self) + @hash.merge other.@hash + end end end From a29974f958d5d1afb737eef08e0892bc7fe62469 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 3 Mar 2026 18:21:45 -0500 Subject: [PATCH 6/6] Handle attributes with a block Storing attributes on the connection doesn't seem useful since the Redis server might not send them to all connections and we have no way to guarantee that a `Client` would yield the same `Connection` twice in a row. This way, all attributes on all connections on a given `Client` can be guaranteed to funnel through a single block and it's up to the caller to determine what to do with it. Open question: should we also yield the connection to the client in that block? How does that work for `Cluster` and `ReplicationClient`? --- src/client.cr | 10 ++++++++++ src/connection.cr | 37 ++++++++++++++++++++++--------------- src/pipeline.cr | 9 +++++++-- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/client.cr b/src/client.cr index 29134d4..2104dc7 100644 --- a/src/client.cr +++ b/src/client.cr @@ -26,6 +26,7 @@ module Redis getter uri : URI @pool : DB::Pool(Connection) + @on_attributes = Proc(Attributes, Nil).new { } def self.from_env(env_var) new(URI.parse(ENV[env_var])) @@ -140,6 +141,15 @@ module Redis checkout(&.multi { |txn| yield txn }) end + # Run the given block when any connection managed by this client receives a + # `Redis::Attributes`. + def on_attributes(&@on_attributes : Attributes -> Nil) : self + @pool.each_resource do |connection| + connection.on_attributes(&on_attributes) + end + self + end + # Watch the given `keys` for changes when you need to fetch them for update # in a transaction and yield the connection with that watch active. This # allows for optimistic updates within the transaction, returning `nil` from diff --git a/src/connection.cr b/src/connection.cr index 5b06b9f..18697d3 100644 --- a/src/connection.cr +++ b/src/connection.cr @@ -19,9 +19,9 @@ module Redis include Commands::Immediate @socket : TCPSocket | OpenSSL::SSL::Socket::Client + @on_attributes = Proc(Attributes, Nil).new { } protected getter parser : Parser getter? closed = false - getter attributes : Attributes = Attributes.new getter log # We receive all connection information in the URI. @@ -57,8 +57,9 @@ module Redis pipeline do |redis| # Authentication if (username = uri.user) && (password = uri.password) - redis.run({"auth", username, password}) + redis.run({"hello", "3", "auth", username, password}) elsif password = uri.password + redis.run({"hello", "3"}) redis.run({"auth", password}) end @@ -354,14 +355,16 @@ module Redis # :nodoc: # Read the next value from the server - def read - case value = @parser.read - when Error - raise value - when Attributes - attributes.merge! value - else - value + def read : Value + loop do + case value = @parser.read + when Error + raise value + when Attributes + @on_attributes.call value + else + return value + end end end @@ -369,11 +372,15 @@ module Redis # Read the next value from the server, returning `nil` if the connection is # closed. def read? - case value = @parser.read? - when Error - raise value - else - value + loop do + case value = @parser.read? + when Error + raise value + when Attributes + @on_attributes.call value + else + return value + end end end diff --git a/src/pipeline.cr b/src/pipeline.cr index af3046a..c69041d 100644 --- a/src/pipeline.cr +++ b/src/pipeline.cr @@ -8,11 +8,12 @@ module Redis class Pipeline include Commands include Commands::Deferred + getter connection : Connection @futures = [] of Future # Wraps a connection so that our `run` and `commit` methods can execute against it. - def initialize(@connection : Connection) + def initialize(@connection) end # The `run` method is required by the `Commands` mixin. When you run a Redis @@ -28,7 +29,11 @@ module Redis # and resolve all `Redis::Future`s with them in the order they were sent. def commit @futures.map_with_index do |future, index| - future.resolve(@connection.read) + while (result = @connection.parser.read).is_a? Attributes + @connection.@on_attributes.call result + end + + future.resolve(result) rescue ex raise ResolutionError.new("Failed reading pipeline item #{index}: #{ex.message.inspect}", cause: ex) end