From 4051d6a5f28ed248d90e7111e2f900e4b42934cb Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Wed, 15 Nov 2023 11:15:08 +1100 Subject: [PATCH 1/7] Implement RedisClient::Cluster::Command#extract_all_keys We need this in order to get the list of keys from a WATCH command. It's probably a little overkill, but may as well implemenet the whole thing. --- lib/redis_client/cluster/command.rb | 51 +++++++++++++++++++++++ test/redis_client/cluster/test_command.rb | 43 ++++++++++++++++--- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 05460482..4de5e2d6 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -11,10 +11,13 @@ class Command LEFT_BRACKET = '{' RIGHT_BRACKET = '}' EMPTY_HASH = {}.freeze + EMPTY_ARRAY = [].freeze Detail = Struct.new( 'RedisCommand', :first_key_position, + :last_key_position, + :key_step, :write?, :readonly?, keyword_init: true @@ -50,6 +53,8 @@ def parse_command_reply(rows) acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new( first_key_position: row[3], + last_key_position: row[4], + key_step: row[5], write?: row[2].include?('write'), readonly?: row[2].include?('readonly') ) @@ -70,6 +75,17 @@ def extract_first_key(command) hash_tag.empty? ? key : hash_tag end + def extract_all_keys(command) + keys_start = determine_first_key_position(command) + keys_end = determine_last_key_position(command, keys_start) + keys_step = determine_key_step(command) + return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?) + + keys_end = [keys_end, command.size - 1].min + # use .. inclusive range because keys_end is a valid index. + (keys_start..keys_end).step(keys_step).map { |i| command[i] } + end + def should_send_to_primary?(command) name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) @commands[name]&.write? @@ -101,6 +117,41 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo end end + # IMPORTANT: this determines the last key position INCLUSIVE of the last key - + # i.e. command[determine_last_key_position(command)] is a key. + # This is in line with what Redis returns from COMMANDS. + def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize + case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + when 'eval', 'evalsha', 'zinterstore', 'zunionstore' + # EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]] + # ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] + command[2].to_i + 2 + when 'object', 'memory' + # OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key + # MEMORY USAGE key [SAMPLES count] + keys_start + when 'migrate' + # MIGRATE host port destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]] + command[3].empty? ? (command.length - 1) : 3 + when 'xread', 'xreadgroup' + # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] + keys_start + ((command.length - keys_start) / 2) - 1 + else + # If there is a fixed, non-variable number of keys, don't iterate past that. + if @commands[name].last_key_position >= 0 + @commands[name].last_key_position + else + command.length + @commands[name].last_key_position + end + end + end + + def determine_key_step(command) + name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + # Some commands like EVALSHA have zero as the step in COMMANDS somehow. + @commands[name].key_step == 0 ? 1 : @commands[name].key_step + end + def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) idx.nil? ? 0 : idx + 1 diff --git a/test/redis_client/cluster/test_command.rb b/test/redis_client/cluster/test_command.rb index b34bb2ae..d3d1f211 100644 --- a/test/redis_client/cluster/test_command.rb +++ b/test/redis_client/cluster/test_command.rb @@ -49,20 +49,20 @@ def test_parse_command_reply [ { rows: [ - ['get', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], - ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, 1, 1, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] + ['get', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], + ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, -1, 2, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true }, - 'set' => { first_key_position: 1, write?: true, readonly?: false } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true }, + 'set' => { first_key_position: 1, last_key_position: -1, key_step: 2, write?: true, readonly?: false } } }, { rows: [ - ['GET', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] + ['GET', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true } } }, { rows: [[]], want: {} }, @@ -212,6 +212,37 @@ def test_extract_hash_tag assert_equal(c[:want], got, msg) end end + + def test_extract_all_keys + cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + [ + { command: ['EVAL', 'return ARGV[1]', '0', 'hello'], want: [] }, + { command: ['EVAL', 'return ARGV[1]', '3', 'key1', 'key2', 'key3', 'arg1', 'arg2'], want: %w[key1 key2 key3] }, + { command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: [] }, + { command: %w[EVALSHA sha1 2 foo bar baz zap], want: %w[foo bar] }, + { command: %w[MIGRATE host port key 0 5 COPY], want: %w[key] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1'], want: %w[key1] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1', 'key2'], want: %w[key1 key2] }, + { command: %w[ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[OBJECT HELP], want: [] }, + { command: %w[MEMORY HELP], want: [] }, + { command: %w[MEMORY USAGE key], want: %w[key] }, + { command: %w[XREAD COUNT 2 STREAMS mystream writers 0-0 0-0], want: %w[mystream writers] }, + { command: %w[XREADGROUP GROUP group consumer STREAMS key id], want: %w[key] }, + { command: %w[SET foo 1], want: %w[foo] }, + { command: %w[set foo 1], want: %w[foo] }, + { command: [['SET'], 'foo', 1], want: %w[foo] }, + { command: %w[GET foo], want: %w[foo] }, + { command: %w[MGET foo bar baz], want: %w[foo bar baz] }, + { command: %w[MSET foo val bar val baz val], want: %w[foo bar baz] }, + { command: %w[BLPOP foo bar 0], want: %w[foo bar] } + ].each_with_index do |c, idx| + msg = "Case: #{idx}" + got = cmd.send(:extract_all_keys, c[:command]) + assert_equal(c[:want], got, msg) + end + end end end end From d82a13e806b74972d99033769d300239e3b682da Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 21 Nov 2023 10:55:12 +1100 Subject: [PATCH 2/7] Perform cluster bookeeping tasks _before_ determining retry Just because a block is not going to be retried, does not mean we should not process topology updates & redirections in response to errors, I think. --- lib/redis_client/cluster/router.rb | 31 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 11c6e22b..dd0e245e 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -83,60 +83,57 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise rescue ::RedisClient::ConnectionError => e raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError) - raise if retry_count <= 0 update_cluster_info! + + raise if retry_count <= 0 + retry_count -= 1 retry end - def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize + def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity node.public_send(method, *args, **kwargs, &block) rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise rescue ::RedisClient::ConnectionError + update_cluster_info! + raise if retry_count <= 0 - update_cluster_info! retry_count -= 1 retry end From b639c438d9cb1c48f297d040675a65bf90e5acb9 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 21 Nov 2023 15:11:58 +1100 Subject: [PATCH 3/7] Refactor key-slot conversion logic Previously, extract_first_key would perform hashtag extraction on the key it pulled from command; that meant that the "key" it was returning might not actually be the key in the command. This commit refactors things so that * extract_first_key actually extracts the first key * KeySlotConverter.convert does hashtag extraction * Router's find_node_key and find_primary_node_key can now be implemented in terms of a function "find_node_by_key", because they _actually_ get the key from the command. --- lib/redis_client/cluster/command.rb | 19 ++------------- .../cluster/key_slot_converter.rb | 18 ++++++++++++++ lib/redis_client/cluster/router.rb | 22 ++++++++++------- test/redis_client/cluster/test_command.rb | 24 +------------------ .../cluster/test_key_slot_converter.rb | 21 ++++++++++++++++ 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 4de5e2d6..dd7ca683 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -2,14 +2,13 @@ require 'redis_client' require 'redis_client/cluster/errors' +require 'redis_client/cluster/key_slot_converter' require 'redis_client/cluster/normalized_cmd_name' class RedisClient class Cluster class Command EMPTY_STRING = '' - LEFT_BRACKET = '{' - RIGHT_BRACKET = '}' EMPTY_HASH = {}.freeze EMPTY_ARRAY = [].freeze @@ -70,9 +69,7 @@ def extract_first_key(command) i = determine_first_key_position(command) return EMPTY_STRING if i == 0 - key = (command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s - hash_tag = extract_hash_tag(key) - hash_tag.empty? ? key : hash_tag + (command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s end def extract_all_keys(command) @@ -156,18 +153,6 @@ def determine_optional_key_position(command, option_name) # rubocop:disable Metr idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) idx.nil? ? 0 : idx + 1 end - - # @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags - def extract_hash_tag(key) - key = key.to_s - s = key.index(LEFT_BRACKET) - return EMPTY_STRING if s.nil? - - e = key.index(RIGHT_BRACKET, s + 1) - return EMPTY_STRING if e.nil? - - key[s + 1..e - 1] - end end end end diff --git a/lib/redis_client/cluster/key_slot_converter.rb b/lib/redis_client/cluster/key_slot_converter.rb index cbb64eaf..8fee4910 100644 --- a/lib/redis_client/cluster/key_slot_converter.rb +++ b/lib/redis_client/cluster/key_slot_converter.rb @@ -3,6 +3,9 @@ class RedisClient class Cluster module KeySlotConverter + EMPTY_STRING = '' + LEFT_BRACKET = '{' + RIGHT_BRACKET = '}' XMODEM_CRC16_LOOKUP = [ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, @@ -45,6 +48,9 @@ module KeySlotConverter def convert(key) return nil if key.nil? + hash_tag = extract_hash_tag(key) + key = hash_tag unless hash_tag.empty? + crc = 0 key.each_byte do |b| crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff] @@ -52,6 +58,18 @@ def convert(key) crc % HASH_SLOTS end + + # @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags + def extract_hash_tag(key) + key = key.to_s + s = key.index(LEFT_BRACKET) + return EMPTY_STRING if s.nil? + + e = key.index(RIGHT_BRACKET, s + 1) + return EMPTY_STRING if e.nil? + + key[s + 1..e - 1] + end end end end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index dd0e245e..2cc16a17 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -167,21 +167,25 @@ def assign_node(command) find_node(node_key) end - def find_node_key(command, seed: nil) - key = @command.extract_first_key(command) - slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key) - - if @command.should_send_to_primary?(command) - @node.find_node_key_of_primary(slot) || @node.any_primary_node_key(seed: seed) + def find_node_key_by_key(key, seed: nil, primary: false) + if key && !key.empty? + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot) else - @node.find_node_key_of_replica(slot, seed: seed) || @node.any_replica_node_key(seed: seed) + primary ? @node.any_primary_node_key(seed: seed) : @node.any_replica_node_key(seed: seed) end end + def find_node_key(command, seed: nil) + key = @command.extract_first_key(command) + find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command)) + end + def find_primary_node_key(command) key = @command.extract_first_key(command) - slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key) - @node.find_node_key_of_primary(slot) + return nil unless key&.size&.> 0 + + find_node_key_by_key(key, primary: true) end def find_node(node_key, retry_count: 3) diff --git a/test/redis_client/cluster/test_command.rb b/test/redis_client/cluster/test_command.rb index d3d1f211..34a992ad 100644 --- a/test/redis_client/cluster/test_command.rb +++ b/test/redis_client/cluster/test_command.rb @@ -84,7 +84,7 @@ def test_extract_first_key [ { command: %w[SET foo 1], want: 'foo' }, { command: %w[GET foo], want: 'foo' }, - { command: %w[GET foo{bar}baz], want: 'bar' }, + { command: %w[GET foo{bar}baz], want: 'foo{bar}baz' }, { command: %w[MGET foo bar baz], want: 'foo' }, { command: %w[UNKNOWN foo bar], want: '' }, { command: [['GET'], 'foo'], want: 'foo' }, @@ -191,28 +191,6 @@ def test_determine_optional_key_position end end - def test_extract_hash_tag - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) - [ - { key: 'foo', want: '' }, - { key: 'foo{bar}baz', want: 'bar' }, - { key: 'foo{bar}baz{qux}quuc', want: 'bar' }, - { key: 'foo}bar{baz', want: '' }, - { key: 'foo{bar', want: '' }, - { key: 'foo}bar', want: '' }, - { key: 'foo{}bar', want: '' }, - { key: '{}foo', want: '' }, - { key: 'foo{}', want: '' }, - { key: '{}', want: '' }, - { key: '', want: '' }, - { key: nil, want: '' } - ].each_with_index do |c, idx| - msg = "Case: #{idx}" - got = cmd.send(:extract_hash_tag, c[:key]) - assert_equal(c[:want], got, msg) - end - end - def test_extract_all_keys cmd = ::RedisClient::Cluster::Command.load(@raw_clients) [ diff --git a/test/redis_client/cluster/test_key_slot_converter.rb b/test/redis_client/cluster/test_key_slot_converter.rb index 48973adc..7de7609d 100644 --- a/test/redis_client/cluster/test_key_slot_converter.rb +++ b/test/redis_client/cluster/test_key_slot_converter.rb @@ -27,6 +27,27 @@ def test_convert got = ::RedisClient::Cluster::KeySlotConverter.convert(multi_byte_key) assert_equal(want, got, "Case: #{multi_byte_key}") end + + def test_extract_hash_tag + [ + { key: 'foo', want: '' }, + { key: 'foo{bar}baz', want: 'bar' }, + { key: 'foo{bar}baz{qux}quuc', want: 'bar' }, + { key: 'foo}bar{baz', want: '' }, + { key: 'foo{bar', want: '' }, + { key: 'foo}bar', want: '' }, + { key: 'foo{}bar', want: '' }, + { key: '{}foo', want: '' }, + { key: 'foo{}', want: '' }, + { key: '{}', want: '' }, + { key: '', want: '' }, + { key: nil, want: '' } + ].each_with_index do |c, idx| + msg = "Case: #{idx}" + got = ::RedisClient::Cluster::KeySlotConverter.extract_hash_tag(c[:key]) + assert_equal(c[:want], got, msg) + end + end end end end From 70c3056f2ff15091e0f726d331af8babdcb2e6fd Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 21 Nov 2023 16:33:17 +1100 Subject: [PATCH 4/7] Add infrastructure for capturing Redis commands in tests --- test/command_capture_middleware.rb | 29 +++++++++++++++++++++++++++++ test/redis_client/test_cluster.rb | 22 +++++++++++++++++----- test/testing_helper.rb | 1 + 3 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 test/command_capture_middleware.rb diff --git a/test/command_capture_middleware.rb b/test/command_capture_middleware.rb new file mode 100644 index 00000000..362e187e --- /dev/null +++ b/test/command_capture_middleware.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module CommandCaptureMiddleware + CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do + def inspect + "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" + end + end + + def call(command, redis_config) + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: false + ) + super + end + + def call_pipelined(commands, redis_config) + commands.map do |command| + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: true + ) + end + super + end +end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 02f477f7..a7b04e50 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,9 +6,11 @@ class RedisClient class TestCluster module Mixin def setup + @captured_commands = [] @client = new_test_client @client.call('FLUSHDB') wait_for_replication + @captured_commands.clear end def teardown @@ -516,10 +518,12 @@ def hiredis_used? class PrimaryOnly < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -529,12 +533,14 @@ def new_test_client class ScaleReadRandom < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -544,12 +550,14 @@ def new_test_client class ScaleReadRandomWithPrimary < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -559,12 +567,14 @@ def new_test_client class ScaleReadLatency < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -574,10 +584,12 @@ def new_test_client class Pooled < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 }) diff --git a/test/testing_helper.rb b/test/testing_helper.rb index 76f02e75..4d09ff23 100644 --- a/test/testing_helper.rb +++ b/test/testing_helper.rb @@ -6,6 +6,7 @@ require 'redis-cluster-client' require 'testing_constants' require 'cluster_controller' +require 'command_capture_middleware' case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby') when 'hiredis' then require 'hiredis-client' From 97d0499c26da3435518e420b8885328e9cf123f9 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Wed, 22 Nov 2023 09:23:20 +1100 Subject: [PATCH 5/7] Handle timeouts to blocking_v specially via a custom client class try_send already handles the case where the call is a blocking one specially, but try_delegate does not. This diff detects whether or not a ::RedisClient::ReadTimeoutError is for a blocking call at the source, and wraps it in a special subclass so we can differentiate it. --- lib/redis_client/cluster/errors.rb | 2 ++ lib/redis_client/cluster/node.rb | 18 +++++++++++++++++- lib/redis_client/cluster/router.rb | 7 ++++--- .../cluster/node/test_latency_replica.rb | 10 +++++----- .../cluster/node/test_primary_only.rb | 10 +++++----- .../cluster/node/test_random_replica.rb | 10 +++++----- .../node/test_random_replica_or_primary.rb | 10 +++++----- test/redis_client/cluster/test_node.rb | 4 ++-- 8 files changed, 45 insertions(+), 26 deletions(-) diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index 0367b3d4..83e289df 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -55,5 +55,7 @@ def initialize(_ = '') ) end end + + class BlockingReadTimeoutError < ::RedisClient::ReadTimeoutError; end end end diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index dc362e18..46a27d7b 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -80,7 +80,7 @@ def []=(index, element) class Config < ::RedisClient::Config def initialize(scale_read: false, **kwargs) @scale_read = scale_read - super(**kwargs) + super(**kwargs, client_implementation: Client) end private @@ -92,6 +92,22 @@ def build_connection_prelude end end + class Client < ::RedisClient + # We need to be able to differentiate between timeout errors caused by blocking read timeouts + # (which should NOT cause a cluster topology update) with normal read timeouts (which should) + def blocking_call(timeout, *command, **kwargs) + super + rescue ::RedisClient::TimeoutError => e + raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message + end + + def blocking_call_v(timeout, command) + super + rescue ::RedisClient::TimeoutError => e + raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message + end + end + class << self def load_info(options, concurrent_worker, slow_command_timeout: -1, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity raise ::RedisClient::Cluster::InitialSetupError, [] if options.nil? || options.empty? diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 2cc16a17..a22f6e7b 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -13,7 +13,6 @@ class RedisClient class Cluster class Router ZERO_CURSOR_FOR_SCAN = '0' - METHODS_FOR_BLOCKING_CMD = %i[blocking_call_v blocking_call].freeze TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry def initialize(config, concurrent_worker, pool: nil, **kwargs) @@ -98,9 +97,9 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa retry if retry_count >= 0 end raise + rescue ::RedisClient::Cluster::BlockingReadTimeoutError + raise rescue ::RedisClient::ConnectionError => e - raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError) - update_cluster_info! raise if retry_count <= 0 @@ -129,6 +128,8 @@ def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # ruboco retry if retry_count >= 0 end raise + rescue ::RedisClient::Cluster::BlockingReadTimeoutError + raise rescue ::RedisClient::ConnectionError update_cluster_info! diff --git a/test/redis_client/cluster/node/test_latency_replica.rb b/test/redis_client/cluster/node/test_latency_replica.rb index 66987523..d85ffdfd 100644 --- a/test/redis_client/cluster/node/test_latency_replica.rb +++ b/test/redis_client/cluster/node/test_latency_replica.rb @@ -11,7 +11,7 @@ class TestLatencyReplica < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_primary_only.rb b/test/redis_client/cluster/node/test_primary_only.rb index ec60afdd..e8983dd8 100644 --- a/test/redis_client/cluster/node/test_primary_only.rb +++ b/test/redis_client/cluster/node/test_primary_only.rb @@ -12,7 +12,7 @@ class TestPrimaryOnly < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -28,7 +28,7 @@ def test_clients_with_pooled_redis_client got = test_topology.clients got.each_value do |client| - assert_instance_of(::RedisClient::Pooled, client) + assert_kind_of(::RedisClient::Pooled, client) assert_equal('master', client.call('ROLE').first) end ensure @@ -38,7 +38,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -46,7 +46,7 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -54,7 +54,7 @@ def test_replica_clients def test_clients_for_scanning got = @test_topology.clients_for_scanning got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end diff --git a/test/redis_client/cluster/node/test_random_replica.rb b/test/redis_client/cluster/node/test_random_replica.rb index 1116bf7a..969806c2 100644 --- a/test/redis_client/cluster/node/test_random_replica.rb +++ b/test/redis_client/cluster/node/test_random_replica.rb @@ -11,7 +11,7 @@ class TestRandomReplica < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_random_replica_or_primary.rb b/test/redis_client/cluster/node/test_random_replica_or_primary.rb index 33319d20..a40e5b72 100644 --- a/test/redis_client/cluster/node/test_random_replica_or_primary.rb +++ b/test/redis_client/cluster/node/test_random_replica_or_primary.rb @@ -11,7 +11,7 @@ class TestRandomReplicaWithPrimary < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index d0819279..d78d765d 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -331,14 +331,14 @@ def test_find_by msg = "Case: primary only: #{info.node_key}" got = -> { @test_node.find_by(info.node_key) } if info.primary? - assert_instance_of(::RedisClient, got.call, msg) + assert_kind_of(::RedisClient, got.call, msg) else assert_raises(::RedisClient::Cluster::Node::ReloadNeeded, msg, &got) end msg = "Case: scale read: #{info.node_key}" got = @test_node_with_scale_read.find_by(info.node_key) - assert_instance_of(::RedisClient, got, msg) + assert_kind_of(::RedisClient, got, msg) end end From 3bdd331929a87157528b732d9b3835813bfed759 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Wed, 22 Nov 2023 17:03:01 +1100 Subject: [PATCH 6/7] Implement RedisClient::Cluster#with This implements support for "pinning" a cluster client to a particular keyslot. This allows the use of WATCH/MULTI/EXEC transactions on that node without any ambiguity. Because RedisClient also implements watch as "yield self" and ignores all arguments, this means that you can easily write transactions that are compatible with both cluster and non-clustered redis (provided you use hashtags for your keys). --- .rubocop.yml | 2 + README.md | 88 ++++++++++++ lib/redis_client/cluster.rb | 15 ++ lib/redis_client/cluster/pinning.rb | 123 +++++++++++++++++ lib/redis_client/cluster/router.rb | 4 + test/redis_client/test_cluster.rb | 207 ++++++++++++++++++++++++++++ test/test_against_cluster_state.rb | 46 +++++++ 7 files changed, 485 insertions(+) create mode 100644 lib/redis_client/cluster/pinning.rb diff --git a/.rubocop.yml b/.rubocop.yml index 6d1f5936..a81474ce 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,6 +19,8 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 500 + Exclude: + - 'test/**/*' Metrics/MethodLength: Max: 50 diff --git a/README.md b/README.md index 6bda4aaf..b333b935 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,94 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3') #=> [nil, nil, nil] ``` +## Transactions +This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`, +and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a +transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using +`#with`. Pass a key to `#with`, and the yielded connection object will enforce that all keys used in the block must +belong to the same slot. + +```ruby +cli.with(key: '{key}1') do |conn| + conn.call('SET', '{key}1', 'This is OK') + #=> "OK" + conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot') + #=> "OK" + conn.call('SET', '{otherslot}3', 'This will raise an exception; this key is not in the same slot as {key}1') + #=> Connection pinned to slot 12539 but command ["SET", "{otherslot}3", "..."] includes key {otherslot}3 in slot 10271 (RedisClient::Cluster::Transaction::ConsistencyError) +end +``` + +When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`: + +```ruby +cli.with(key: '{myslot}') do |conn| + # You can use any key starting with {myslot} in this block +end +``` + +Once you have pinned a client to a particular slot, you can use the same transaction APIs as the +[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows. + +```ruby +# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two. +cli.call('SET', 'key', 0) +cli.with(key: 'key') do |conn| + conn.multi do |txn| + txn.call('INCR', 'key') + txn.call('INCR', 'key') + end + #=> ['OK', 'OK'] +end + +# Conditional execution with WATCH can be used to e.g. atomically swap two keys +cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2') +cli.with(key: '{key}') do |conn| + conn.call('WATCH', '{key}1', '{key}2') + conn.multi do |txn| + old_key1 = conn.call('GET', '{key}1') + old_key2 = conn.call('GET', '{key}2') + txn.call('SET', '{key}1', old_key2) + txn.call('SET', '{key}2', old_key1) + end + # This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified + # either of the values +end + +# You can also pass watch: to #multi as a shortcut +cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2') +cli.with(key: '{key}') do |conn| + conn.multi(watch: ['{key}1', '{key}2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2') + txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1) + end +end +``` + +Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because +you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot +it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero +`retry_count` to `#with`. + +```ruby +cli.with(key: '{key}', retry_count: 1) do |conn| + conn.call('GET', '{key}1') + #=> "value1" + + # Now, some changes in cluster topology mean that {key} is moved to a different node! + + conn.call('GET', '{key}2') + #=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError) + + # Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered + # correct node. +end +``` + +Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its +arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with` +call will pin the connection to a slot when using clustering, or be a no-op when not. + ## ACL The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly. So please permit it like the followings. diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 0ea6eccd..839cf0a5 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'redis_client/cluster/concurrent_worker' +require 'redis_client/cluster/pinning' require 'redis_client/cluster/pipeline' require 'redis_client/cluster/pub_sub' require 'redis_client/cluster/router' @@ -97,6 +98,20 @@ def pubsub ::RedisClient::Cluster::PubSub.new(@router, @command_builder) end + def with(key:, write: true, retry_count: 0) + raise ArgumentError, 'key must be provided' if key.nil? || key.empty? + + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + node_key = @router.find_node_key_by_key(key, primary: write) + node = @router.find_node(node_key) + # Calling #with checks out the underlying connection if this is a pooled connection + # Calling it through #try_delegate ensures we handle any redirections and retry the entire + # transaction if so. + @router.try_delegate(node, :with, retry_count: retry_count) do |conn| + yield ::RedisClient::Cluster::Pinning::ConnectionProxy.new(conn, slot, @router, @command_builder) + end + end + def close @concurrent_worker.close @router.close diff --git a/lib/redis_client/cluster/pinning.rb b/lib/redis_client/cluster/pinning.rb new file mode 100644 index 00000000..da19b339 --- /dev/null +++ b/lib/redis_client/cluster/pinning.rb @@ -0,0 +1,123 @@ +# frozen_string_literal: true + +require 'redis_client' +require 'delegate' + +class RedisClient + class Cluster + module Pinning + EMPTY_ARRAY = [].freeze + + class BaseDelegator < SimpleDelegator + def initialize(conn, slot, router, command_builder) + @conn = conn + @slot = slot + @router = router + @command_builder = command_builder + super(@conn) + end + + private + + def ensure_key_not_cross_slot!(key, method_name) + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + return unless slot != @slot + + raise ::RedisClient::Cluster::Transaction::ConsistencyError.new, + "Connection pinned to slot #{@slot} but method #{method_name} called with key #{key} in slot #{slot}" + end + + def ensure_command_not_cross_slot!(command) + command_keys = @router.command_extract_all_keys(command) + command_keys.each do |key| + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + if slot != @slot + raise ::RedisClient::Cluster::Transaction::ConsistencyError.new, + "Connection pinned to slot #{@slot} but command #{command.inspect} includes key #{key} in slot #{slot}" + end + end + end + end + + module CallDelegator + def call(*command, **kwargs) + command = @command_builder.generate(command, kwargs) + ensure_command_not_cross_slot! command + super + end + alias call_once call + + def call_v(command) + command = @command_builder.generate(command) + ensure_command_not_cross_slot! command + super + end + alias call_once_v call_v + end + + module BlockingCallDelegator + def blocking_call(timeout, *command, **kwargs) + command = @command_builder.generate(command, kwargs) + ensure_command_not_cross_slot! command + super + end + + def blocking_call_v(timeout, command) + command = @command_builder.generate(command) + ensure_command_not_cross_slot! command + super + end + end + + class PipelineProxy < BaseDelegator + include CallDelegator + include BlockingCallDelegator + end + + class MultiProxy < BaseDelegator + include CallDelegator + end + + class ConnectionProxy < BaseDelegator + include CallDelegator + include BlockingCallDelegator + + def sscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :sscan + super + end + + def hscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :hscan + super + end + + def zscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :zscan + super + end + + def pipelined + @conn.pipelined do |conn_pipeline| + yield PipelineProxy.new(conn_pipeline, @slot, @router, @command_builder) + end + end + + def multi(watch: nil) + call('WATCH', *watch) if watch + begin + @conn.pipelined do |conn_pipeline| + txn = MultiProxy.new(conn_pipeline, @slot, @router, @command_builder) + txn.call('MULTI') + yield txn + txn.call('EXEC') + end.last + rescue StandardError + call('UNWATCH') if connected? && watch + raise + end + end + end + end + end +end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index a22f6e7b..40ce08ce 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -203,6 +203,10 @@ def command_exists?(name) @command.exists?(name) end + def command_extract_all_keys(command) + @command.extract_all_keys(command) + end + def assign_redirection_node(err_msg) _, slot, node_key = err_msg.split slot = slot.to_i diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index a7b04e50..f4fbe24d 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -480,6 +480,213 @@ def test_circuit_breakers cli&.close end + def test_pinning_single_key + got = @client.with(key: 'key1') do |conn| + conn.call('SET', 'key1', 'hello') + conn.call('GET', 'key1') + end + assert_equal('hello', got) + end + + def test_pinning_no_key + assert_raises(ArgumentError) do + @client.with(key: nil) {} + end + end + + def test_pinning_two_keys + got = @client.with(key: '{slot}') do |conn| + conn.call('SET', '{slot}key1', 'v1') + conn.call('SET', '{slot}key2', 'v2') + conn.call('MGET', '{slot}key1', '{slot}key2') + end + assert_equal(%w[v1 v2], got) + end + + def test_pinning_cross_slot + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.with(key: '{slot1}') do |conn| + conn.call('GET', '{slot2}') + end + end + end + + def test_pinning_pipeline + got = @client.with(key: '{slot}') do |conn| + conn.call_v(['SET', '{slot}counter', 0]) + conn.pipelined do |pipe| + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + end + conn.call_v(['GET', '{slot}counter']).to_i + end + + assert_equal(3, got) + end + + def test_pinning_pipeline_with_error + assert_raises(RedisClient::CommandError) do + @client.with(key: '{slot}') do |conn| + conn.pipelined do |pipeline| + pipeline.call('SET', '{slot}key', 'first') + pipeline.call('SET', '{slot}key', 'second', 'too many args') + pipeline.call('SET', '{slot}key', 'third') + end + end + end + + wait_for_replication + assert_equal('third', @client.call('GET', '{slot}key')) + end + + def test_pinning_transaction + got = @client.with(key: '{slot}') do |conn| + conn.multi do |txn| + txn.call('SET', '{slot}key1', 'value1') + txn.call('SET', '{slot}key2', 'value2') + end + end + + assert_equal(%w[OK OK], got) + end + + def test_pinning_transaction_watch_arg + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(key: '{slot}') do |conn| + conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_watch_arg_unwatches_on_raise + ex = Class.new(StandardError) + @captured_commands.clear + + assert_raises(ex) do + @client.with(key: '{slot}') do |conn| + conn.multi(watch: ['{slot}key1']) do |_txn| + conn.call('GET', '{slot}key1') + raise ex, 'boom' + end + end + end + + assert_equal([ + %w[WATCH {slot}key1], + %w[GET {slot}key1], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_pinning_transaction_can_watch_manually + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(key: '{slot}') do |conn| + conn.call('WATCH', '{slot}key1', '{slot}key2') + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + conn.multi do |txn| + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_can_unwatch_manually + got = @client.with(key: '{slot}') do |conn| + conn.call('WATCH', '{slot}key1') + conn.call('UNWATCH') + end + + assert_equal('OK', got) + end + + def test_pinning_nonblocking_timeouts_update_topology + @client.call('DEL', '{slot}list') + @captured_commands.clear + + assert_raises(::RedisClient::ReadTimeoutError) do + @client.with(key: '{slot}') do |conn| + conn.call_v(['BLPOP', '{slot}list', 0]) + end + end + assert_includes(@captured_commands.map(&:command), %w[CLUSTER NODES]) + end + + def test_pinning_blocking_timeouts_do_not_update_topology + @client.call('DEL', '{slot}list') + @captured_commands.clear + + assert_raises(::RedisClient::ReadTimeoutError) do + @client.with(key: '{slot}') do |conn| + conn.blocking_call_v(1, ['BLPOP', '{slot}list', 0]) + end + end + refute_includes(@captured_commands.map(&:command), %w[CLUSTER NODES]) + end + + def test_pinning_sscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << i + @client.call('SADD', '{slot}set', i) + end + @client.with(key: '{slot}') do |conn| + conn.sscan('{slot}set') do |i| + scanned_set << i.to_i + end + end + + assert_equal(expected_set, scanned_set) + end + + def test_pinning_zscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << "member#{i}" + @client.call('ZADD', '{slot}set', i, "member#{i}") + end + @client.with(key: '{slot}') do |conn| + conn.zscan('{slot}set') do |i| + scanned_set << i + end + end + + assert_equal(expected_set, scanned_set) + end + private def wait_for_replication diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 813ba4df..096e9cbf 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -84,6 +84,52 @@ def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection end end + def test_aborts_pinning_on_resharding + @client.call_v(['SET', '{slot}counter', 0]) + slot = ::RedisClient::Cluster::KeySlotConverter.convert('{slot}') + + assert_raises(::RedisClient::CommandError) do + @client.with(key: '{slot}') do |conn| + conn.call_v(['INCR', '{slot}counter']) # should work + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + # this should now fail for want of being on the wrong slot. + conn.call_v(['INCR', '{slot}counter']) + end + end + + # The second one will now work though, because we processed the resharding. + @client.with(key: '{slot}') do |conn| + conn.call_v(['INCR', '{slot}counter']) + end + + # There were two INCRs which should work. + assert_equal(2, @client.call_v(['GET', '{slot}counter']).to_i) + end + + def test_can_retry_pinning_on_resharding + @client.call_v(['SET', '{slot}counter', 0]) + slot = ::RedisClient::Cluster::KeySlotConverter.convert('{slot}') + first_time = true + @client.with(key: '{slot}', retry_count: 1) do |conn| + conn.call_v(['INCR', '{slot}counter']) # should work + + if first_time + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + first_time = false + end + + # this should now fail for want of being on the wrong slot (the first time) + conn.call_v(['INCR', '{slot}counter']) + end + + # There were three INCRs which should work. + assert_equal(3, @client.call_v(['GET', '{slot}counter']).to_i) + end + private def wait_for_replication From ead279bdec4dae6c787b905cbba4ca673fe43a17 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Thu, 30 Nov 2023 20:29:17 +1100 Subject: [PATCH 7/7] Re-implement RedisClient::Cluster#multi in terms of #with This does change the behaviour ever so slightly (see updated tests), but I believe for the better. --- lib/redis_client/cluster.rb | 11 +++-- lib/redis_client/cluster/errors.rb | 4 ++ lib/redis_client/cluster/transaction.rb | 57 ------------------------- test/redis_client/test_cluster.rb | 23 +++++----- 4 files changed, 23 insertions(+), 72 deletions(-) delete mode 100644 lib/redis_client/cluster/transaction.rb diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 839cf0a5..27aff18f 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -5,7 +5,6 @@ require 'redis_client/cluster/pipeline' require 'redis_client/cluster/pub_sub' require 'redis_client/cluster/router' -require 'redis_client/cluster/transaction' class RedisClient class Cluster @@ -90,8 +89,14 @@ def pipelined pipeline.execute end - def multi(watch: nil, &block) - ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) + def multi(watch: nil, key: nil, &block) + slot_key = key + slot_key = watch&.first if slot_key.nil? + raise ArgumentError, 'watch or key must be provided' if slot_key.nil? + + with(key: slot_key) do |conn| + conn.multi(watch: watch, &block) + end end def pubsub diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index 83e289df..33dc5dcf 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -57,5 +57,9 @@ def initialize(_ = '') end class BlockingReadTimeoutError < ::RedisClient::ReadTimeoutError; end + + module Transaction + ConsistencyError = Class.new(::RedisClient::Error) + end end end diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb deleted file mode 100644 index 7302a283..00000000 --- a/lib/redis_client/cluster/transaction.rb +++ /dev/null @@ -1,57 +0,0 @@ -# frozen_string_literal: true - -require 'redis_client' - -class RedisClient - class Cluster - class Transaction - ConsistencyError = Class.new(::RedisClient::Error) - - def initialize(router, command_builder) - @router = router - @command_builder = command_builder - @node_key = nil - end - - def call(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) - end - - def call_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) - end - - def call_once(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) - end - - def call_once_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) - end - - def execute(watch: nil, &block) - yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? - - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) - end - - private - - def ensure_node_key(command) - node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? - - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key - - nil - end - end - end -end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index f4fbe24d..2d387799 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -178,7 +178,7 @@ def test_pipelined_with_many_commands end def test_transaction_with_single_key - got = @client.multi do |t| + got = @client.multi(key: 'counter') do |t| t.call('SET', 'counter', '0') t.call('INCR', 'counter') t.call('INCR', 'counter') @@ -190,7 +190,7 @@ def test_transaction_with_single_key def test_transaction_with_multiple_key assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| + @client.multi(key: 'key1') do |t| t.call('SET', 'key1', '1') t.call('SET', 'key2', '2') t.call('SET', 'key3', '3') @@ -204,20 +204,19 @@ def test_transaction_with_multiple_key def test_transaction_with_empty_block assert_raises(ArgumentError) { @client.multi {} } - assert_raises(LocalJumpError) { @client.multi } + assert_raises(LocalJumpError) { @client.multi(key: 'foo') } end def test_transaction_with_keyless_commands - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| - t.call('ECHO', 'foo') - t.call('ECHO', 'bar') - end + got = @client.multi(key: 'hello') do |t| + t.call('ECHO', 'foo') + t.call('ECHO', 'bar') end + assert_equal %w[foo bar], got end def test_transaction_with_hashtag - got = @client.multi do |t| + got = @client.multi(key: '{key}') do |t| t.call('MSET', '{key}1', '1', '{key}2', '2') t.call('MSET', '{key}3', '3', '{key}4', '4') end @@ -228,14 +227,14 @@ def test_transaction_with_hashtag def test_transaction_without_hashtag assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| + @client.multi(key: 'key1') do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key3', '3', 'key4', '4') end end - assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do - @client.multi do |t| + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(key: 'key1') do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key1', '1', 'key3', '3') t.call('MSET', 'key1', '1', 'key4', '4')