diff --git a/.rubocop.yml b/.rubocop.yml index 6d1f5936..a81474ce 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,6 +19,8 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 500 + Exclude: + - 'test/**/*' Metrics/MethodLength: Max: 50 diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 0ea6eccd..b29fa123 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -25,36 +25,37 @@ def inspect def call(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:call_v, command, &block) + send_command(:call_v, command, &block) end def call_v(command, &block) command = @command_builder.generate(command) - @router.send_command(:call_v, command, &block) + send_command(:call_v, command, &block) end def call_once(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:call_once_v, command, &block) + send_command(:call_once_v, command, &block) end def call_once_v(command, &block) command = @command_builder.generate(command) - @router.send_command(:call_once_v, command, &block) + send_command(:call_once_v, command, &block) end def blocking_call(timeout, *args, **kwargs, &block) command = @command_builder.generate(args, kwargs) - @router.send_command(:blocking_call_v, command, timeout, &block) + send_command(:blocking_call_v, command, timeout, &block) end def blocking_call_v(timeout, command, &block) command = @command_builder.generate(command) - @router.send_command(:blocking_call_v, command, timeout, &block) + send_command(:blocking_call_v, command, timeout, &block) end def scan(*args, **kwargs, &block) raise ArgumentError, 'block required' unless block + raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'scan is not valid inside a transaction' if @transaction seed = Random.new_seed cursor = ZERO_CURSOR_FOR_SCAN @@ -66,17 +67,17 @@ def scan(*args, **kwargs, &block) end def sscan(key, *args, **kwargs, &block) - node = @router.assign_node(['SSCAN', key]) + node = assign_node(['SSCAN', key]) @router.try_delegate(node, :sscan, key, *args, **kwargs, &block) end def hscan(key, *args, **kwargs, &block) - node = @router.assign_node(['HSCAN', key]) + node = assign_node(['HSCAN', key]) @router.try_delegate(node, :hscan, key, *args, **kwargs, &block) end def zscan(key, *args, **kwargs, &block) - node = @router.assign_node(['ZSCAN', key]) + node = assign_node(['ZSCAN', key]) @router.try_delegate(node, :zscan, key, *args, **kwargs, &block) end @@ -90,7 +91,21 @@ def pipelined end def multi(watch: nil, &block) - ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) + # This is tricky and importnat. + # If you call #multi _outside_ of a watch block, you expect that the connection + # has returned to its original state at the end of the block; what was watched with the watch: + # kwarg is unwatched if the transaction is not committed. + # However, if you call #multi in a watch block (from Redis::Cluster), Redis::Cluster#watch actually + # calls unwatch if an exception gets thrown, _including an exception from inside a multi block_. + # So, we need to record whether a transaction already existed before calling multi; if so, we leave + # responsibility for disposing of that transaction to the caller who created it. + transaction_was_preexisting = !@transaction.nil? + build_transaction! + begin + @transaction.multi(watch: watch, &block) + ensure + @transaction = nil if @transaction&.complete? || !transaction_was_preexisting + end end def pubsub @@ -109,7 +124,7 @@ def method_missing(name, *args, **kwargs, &block) if @router.command_exists?(name) args.unshift(name) command = @command_builder.generate(args, kwargs) - return @router.send_command(:call_v, command, &block) + return send_command(:call_v, command, &block) end super @@ -120,5 +135,28 @@ def respond_to_missing?(name, include_private = false) super end + + def build_transaction! + return if @transaction + + @transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder) + end + + def send_command(method, command, *args, &block) + build_transaction! if ::RedisClient::Cluster::Transaction.command_starts_transaction?(command) + if @transaction + begin + @transaction.send_command(method, command, *args, &block) + ensure + @transaction = nil if @transaction&.complete? + end + else + @router.send_command(method, command, *args, &block) + end + end + + def assign_node(command) + @transaction ? @transaction.node : @router.assign_node(command) + end end end diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 05460482..90187cc6 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -11,10 +11,14 @@ class Command LEFT_BRACKET = '{' RIGHT_BRACKET = '}' EMPTY_HASH = {}.freeze + EMPTY_ARRAY = [].freeze + PRIMARY_COMMANDS = %w[watch unwatch multi exec discard].freeze Detail = Struct.new( 'RedisCommand', :first_key_position, + :last_key_position, + :key_step, :write?, :readonly?, keyword_init: true @@ -50,6 +54,8 @@ def parse_command_reply(rows) acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new( first_key_position: row[3], + last_key_position: row[4], + key_step: row[5], write?: row[2].include?('write'), readonly?: row[2].include?('readonly') ) @@ -70,9 +76,20 @@ def extract_first_key(command) hash_tag.empty? ? key : hash_tag end + def extract_all_keys(command) + keys_start = determine_first_key_position(command) + keys_end = determine_last_key_position(command, keys_start) + keys_step = determine_key_step(command) + return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?) + + keys_end = [keys_end, command.size - 1].min + # use .. inclusive range because keys_end is a valid index. + (keys_start..keys_end).step(keys_step).map { |i| command[i] } + end + def should_send_to_primary?(command) name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) - @commands[name]&.write? + @commands[name]&.write? || PRIMARY_COMMANDS.include?(name) end def should_send_to_replica?(command) @@ -101,6 +118,41 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo end end + # IMPORTANT: this determines the last key position INCLUSIVE of the last key - + # i.e. command[determine_last_key_position(command)] is a key. + # This is in line with what Redis returns from COMMANDS. + def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize + case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + when 'eval', 'evalsha', 'zinterstore', 'zunionstore' + # EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]] + # ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] + command[2].to_i + 2 + when 'object', 'memory' + # OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key + # MEMORY USAGE key [SAMPLES count] + keys_start + when 'migrate' + # MIGRATE host port destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]] + command[3].empty? ? (command.length - 1) : 3 + when 'xread', 'xreadgroup' + # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] + keys_start + ((command.length - keys_start) / 2) - 1 + else + # If there is a fixed, non-variable number of keys, don't iterate past that. + if @commands[name].last_key_position >= 0 + @commands[name].last_key_position + else + command.length - 1 + end + end + end + + def determine_key_step(command) + name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + # Some commands like EVALSHA have zero as the step in COMMANDS somehow. + @commands[name].key_step == 0 ? 1 : @commands[name].key_step + end + def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) idx.nil? ? 0 : idx + 1 diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 11c6e22b..3a1df994 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -83,56 +83,52 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise rescue ::RedisClient::ConnectionError => e raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError) - raise if retry_count <= 0 update_cluster_info! + + raise if retry_count <= 0 + retry_count -= 1 retry end - def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize + def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity node.public_send(method, *args, **kwargs, &block) rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise rescue ::RedisClient::ConnectionError raise if retry_count <= 0 diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 7302a283..d4773813 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -6,51 +6,152 @@ class RedisClient class Cluster class Transaction ConsistencyError = Class.new(::RedisClient::Error) + STARTING_COMMANDS = %w[watch].freeze + COMPLETING_COMMANDS = %w[exec discard unwatch].freeze + + def self.command_starts_transaction?(command) + STARTING_COMMANDS.include?(::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)) + end + + def self.command_ends_transaction?(command) + COMPLETING_COMMANDS.include?(::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)) + end def initialize(router, command_builder) @router = router @command_builder = command_builder @node_key = nil + @node = nil + @pool = nil + @state = :unstarted + end + + attr_reader :node + + def send_command(method, command, *args, &block) + # Force-disable retries in transactions + method = make_method_once(method) + + # redis-rb wants to do this when it probably doesn't need to. + cmd = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + if complete? + return if COMPLETING_COMMANDS.include?(cmd) + + raise ArgumentError, 'Transaction is already complete' + end + + begin + ensure_same_node command + ret = @router.try_send(@node, method, command, args, retry_count: 0, &block) + rescue ::RedisClient::ConnectionError + mark_complete # Abort transaction on connection errors + raise + rescue StandardError + # Abort transaction if the first command is a failure. + mark_complete if @state == :unstarted + raise + else + @state = :in_progress + mark_complete if COMPLETING_COMMANDS.include?(cmd) + ret + end end - def call(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + def complete? + # The router closes the node on ConnectionError. + mark_complete if @state != :complete && @node && !@node.connected? + @state == :complete end - def call_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) + def multi(watch: nil) # rubocop:disable Metrics/AbcSize + send_command(:call_once_v, ['WATCH', *watch]) if watch&.any? + + begin + command_buffer = MultiBuffer.new + yield command_buffer + return [] unless command_buffer.commands.any? + + command_buffer.commands.each { |command| ensure_same_node(command) } + res = execute_commands_pipelined(command_buffer.commands) + # Because we directly called MULTI ... EXEC on the underlying node through the pipeline, + # and not in send_command, + mark_complete + res.last + ensure + send_command(:call_once_v, ['UNWATCH']) if watch&.any? && !complete? + end end - def call_once(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + def ensure_same_node(command) + node_key = @router.find_primary_node_key(command) + if node_key.nil? + # If ome previous command worked out what node to use, this is OK. + raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" unless @node + elsif @node.nil? + # This is the first node key we've seen + @node = @router.find_node(node_key) + @node_key = node_key + if @node.respond_to?(:pool, true) + # This is a hack, but we need to check out a connection from the pool and keep it checked out until we unwatch, + # if we're using a Pooled backend. Otherwise, we might not run commands on the same session we watched on. + # Note that ConnectionPool keeps a reference to this connection in a threadlocal, so we don't need to actually _use_ it + # explicitly; it'll get returned from #with like normal. + @pool = @node.send(:pool) + @node = @pool.checkout + end + elsif node_key != @node_key + raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" + end end - def call_once_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) + private + + def mark_complete + @pool&.checkin + @pool = nil + @node = nil + @state = :complete end - def execute(watch: nil, &block) - yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? + def make_method_once(method) + case method + when :call + :call_once + when :call_v + :call_once_v + else + method + end + end - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) + def execute_commands_pipelined(commands) + @router.try_delegate(@node, :pipelined, retry_count: 0) do |p| + p.call_once_v ['MULTI'] + commands.each do |command| + p.call_once_v command + end + p.call_once_v ['EXEC'] + end end - private + class MultiBuffer + def initialize + @commands = [] + end - def ensure_node_key(command) - node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? + def call(*command, **_kwargs, &_) + @commands << command + nil + end - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key + def call_v(command, &_) + @commands << command + nil + end - nil + alias call_once call + alias call_once_v call_v + attr_accessor :commands end end end diff --git a/test/cluster_controller.rb b/test/cluster_controller.rb index ca2d4902..5dc4047a 100644 --- a/test/cluster_controller.rb +++ b/test/cluster_controller.rb @@ -245,6 +245,11 @@ def select_sacrifice_of_replica rows.select(&:replica?).sample.client end + def select_sacrifice_by_slot(slot) + rows = associate_with_clients_and_nodes(@clients) + rows.find { |r| r.primary? && r.include_slot?(slot) }.client + end + def close @clients.each do |client| client.close diff --git a/test/command_capture_middleware.rb b/test/command_capture_middleware.rb new file mode 100644 index 00000000..362e187e --- /dev/null +++ b/test/command_capture_middleware.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module CommandCaptureMiddleware + CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do + def inspect + "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" + end + end + + def call(command, redis_config) + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: false + ) + super + end + + def call_pipelined(commands, redis_config) + commands.map do |command| + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: true + ) + end + super + end +end diff --git a/test/redis_client/cluster/test_command.rb b/test/redis_client/cluster/test_command.rb index b34bb2ae..02710000 100644 --- a/test/redis_client/cluster/test_command.rb +++ b/test/redis_client/cluster/test_command.rb @@ -49,20 +49,20 @@ def test_parse_command_reply [ { rows: [ - ['get', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], - ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, 1, 1, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] + ['get', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], + ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, -1, 2, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true }, - 'set' => { first_key_position: 1, write?: true, readonly?: false } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true }, + 'set' => { first_key_position: 1, last_key_position: -1, key_step: 2, write?: true, readonly?: false } } }, { rows: [ - ['GET', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] + ['GET', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true } } }, { rows: [[]], want: {} }, @@ -103,9 +103,9 @@ def test_should_send_to_primary? [ { command: %w[SET foo 1], want: true }, { command: %w[GET foo], want: false }, - { command: %w[UNKNOWN foo bar], want: nil }, - { command: [], want: nil }, - { command: nil, want: nil } + { command: %w[UNKNOWN foo bar], want: false }, + { command: [], want: false }, + { command: nil, want: false } ].each_with_index do |c, idx| msg = "Case: #{idx}" got = cmd.should_send_to_primary?(c[:command]) @@ -212,6 +212,36 @@ def test_extract_hash_tag assert_equal(c[:want], got, msg) end end + + def test_extract_all_keys + cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + [ + { command: ['EVAL', 'return ARGV[1]', '0', 'hello'], want: [] }, + { command: ['EVAL', 'return ARGV[1]', '3', 'key1', 'key2', 'key3', 'arg1', 'arg2'], want: %w[key1 key2 key3] }, + { command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: [] }, + { command: %w[EVALSHA sha1 2 foo bar baz zap], want: %w[foo bar] }, + { command: %w[MIGRATE host port key 0 5 COPY], want: %w[key] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1'], want: %w[key1] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1', 'key2'], want: %w[key1 key2] }, + { command: %w[ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[OBJECT HELP], want: [] }, + { command: %w[MEMORY HELP], want: [] }, + { command: %w[MEMORY USAGE key], want: %w[key] }, + { command: %w[XREAD COUNT 2 STREAMS mystream writers 0-0 0-0], want: %w[mystream writers] }, + { command: %w[XREADGROUP GROUP group consumer STREAMS key id], want: %w[key] }, + { command: %w[SET foo 1], want: %w[foo] }, + { command: %w[set foo 1], want: %w[foo] }, + { command: [['SET'], 'foo', 1], want: %w[foo] }, + { command: %w[GET foo], want: %w[foo] }, + { command: %w[MGET foo bar baz], want: %w[foo bar baz] }, + { command: %w[MSET foo val bar val baz val], want: %w[foo bar baz] } + ].each_with_index do |c, idx| + msg = "Case: #{idx}" + got = cmd.send(:extract_all_keys, c[:command]) + assert_equal(c[:want], got, msg) + end + end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 5fdb5e72..9c366da8 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,9 +6,11 @@ class RedisClient class TestCluster module Mixin def setup + @captured_commands = [] @client = new_test_client @client.call('FLUSHDB') wait_for_replication + @captured_commands.clear end def teardown @@ -207,7 +209,7 @@ def test_transaction_with_multiple_key end def test_transaction_with_empty_block - assert_raises(ArgumentError) { @client.multi {} } + assert_empty(@client.multi {}) assert_raises(LocalJumpError) { @client.multi } end @@ -251,6 +253,215 @@ def test_transaction_without_hashtag end end + def test_transaction_with_empty_watch + got = @client.multi(watch: []) do |t| + t.call('SET', '{hash}key1', 'value1') + t.call('SET', '{hash}key2', 'value2') + end + + assert_equal(%w[OK OK], got) + assert_equal(%w[value1 value2], @client.call('MGET', '{hash}key1', '{hash}key2')) + end + + def test_transaction_with_cross_slot_watch + assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do + @client.multi(watch: Array.new(20) { |i| "key#{i}" }) {} + end + end + + def test_transaction_with_successful_watch + @client.call('SET', 'key', '1') + got = @client.multi(watch: ['key']) do |t| + old_value = @client.call('GET', 'key').to_i + t.call('SET', 'key', (old_value + 1).to_s) + end + + assert_equal(%w[OK], got) + assert_equal('2', @client.call('GET', 'key')) + end + + def test_transaction_does_not_unwatch_on_commit + @client.call('SET', 'key', '1') + @captured_commands.clear + + @client.multi(watch: ['key']) do |t| + old_value = @client.call('GET', 'key').to_i + t.call('SET', 'key', (old_value + 1).to_s) + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[MULTI], + %w[SET key 2], + %w[EXEC] + ], @captured_commands.map(&:command)) + end + + def test_transaction_unwatches_on_abort + @client.call('SET', 'key', '1') + @captured_commands.clear + + @client.multi(watch: ['key']) do |_t| + @client.call('GET', 'key') + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_transaction_unwatches_on_exception + @client.call('SET', 'key', '1') + @captured_commands.clear + + assert_raises(StandardError) do + @client.multi(watch: ['key']) do |_t| + @client.call('GET', 'key') + raise StandardError, 'boom' + end + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_transaction_executes_on_primary + keys = (1..20).map { |k| "{hash}key#{k}" } + keys.each { |key| @client.call('SET', key, '1') } + primary_url_for_slot = @captured_commands.last.server_url + @captured_commands.clear + + @client.multi(watch: keys) do |_t| + keys.each { |key| @client.call('GET', key) } + end + + @captured_commands.each do |cmd| + assert_equal(primary_url_for_slot, cmd.server_url) + end + end + + def test_implicit_transaction_when_watching + @client.call('SET', '{slot}key1', 'value1') + @client.call('SET', '{slot}key2', 'value2') + @captured_commands.clear + + @client.call_v(['WATCH', '{slot}key1']) + @client.call_v(['WATCH', '{slot}key2']) + old_value1 = @client.call_v(['GET', '{slot}key1']) + old_value2 = @client.call_v(['GET', '{slot}key2']) + ret = @client.multi do |txn| + txn.call_v(['SET', '{slot}key1', old_value2]) + txn.call_v(['SET', '{slot}key2', old_value1]) + end + + assert_equal(%w[OK OK], ret) + assert_equal([ + %w[WATCH {slot}key1], + %w[WATCH {slot}key2], + %w[GET {slot}key1], + %w[GET {slot}key2], + %w[MULTI], + %w[SET {slot}key1 value2], + %w[SET {slot}key2 value1], + %w[EXEC] + ], @captured_commands.map(&:command)) + assert_equal('value2', @client.call('GET', '{slot}key1')) + assert_equal('value1', @client.call('GET', '{slot}key2')) + end + + def test_implicit_transaction_abort + other_client = new_test_client(capture_buffer: []) + @client.call('SET', '{slot}key1', 'value1') + @client.call('SET', '{slot}key2', 'value2') + @captured_commands.clear + + @client.call_v(['WATCH', '{slot}key1']) + @client.call_v(['WATCH', '{slot}key2']) + old_value1 = @client.call_v(['GET', '{slot}key1']) + old_value2 = @client.call_v(['GET', '{slot}key2']) + other_client.call_v(['SET', '{slot}key1', 'some_conflicting_value']) + ret = @client.multi do |txn| + txn.call_v(['SET', '{slot}key1', old_value2]) + txn.call_v(['SET', '{slot}key2', old_value1]) + end + + assert_nil(ret) + assert_equal('some_conflicting_value', @client.call('GET', '{slot}key1')) + assert_equal('value2', @client.call('GET', '{slot}key2')) + end + + def test_implicit_transaction_unwatch + @client.call_v(%w[WATCH key1]) + ret = @client.call_v(['UNWATCH']) + assert_equal('OK', ret) + end + + def test_implicit_transaction_get_conflicting_keys + @client.call_v(['WATCH', '{slot1}key1']) + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.call_v(['GET', '{slot2}key1']) + end + # It's still OK to unwatch; this is what redis-rb will do if this kind of cross-slot read + # is attempted inside a watch do ... end block. + ret = @client.call_v(['UNWATCH']) + assert_equal('OK', ret) + end + + def test_connection_can_be_used_after_transaction + @client.multi do |txn| + txn.call_v(['SET', '{slota}key1', 'hello']) + end + + assert_equal('OK', @client.call_v(['SET', '{slotb}key2', 'hello'])) + assert_equal('OK', @client.call_v(['SET', '{slotc}key3', 'hello'])) + end + + def test_multiple_implicit_transactions + swap_keys_txn = lambda { |key1, key2| + @client.call('SET', key1, 'value1') + @client.call('SET', key2, 'value2') + @client.call('SET', key1, 'value1') + @client.call('SET', key2, 'value2') + @client.call_v(['WATCH', key1]) + @client.call_v(['WATCH', key2]) + old_value1 = @client.call_v(['GET', key1]) + old_value2 = @client.call_v(['GET', key2]) + @client.multi do |txn| + txn.call_v(['SET', key1, old_value2]) + txn.call_v(['SET', key2, old_value1]) + end + } + + ret = swap_keys_txn.call('{slota}key1', '{slota}key2') + assert_equal(%w[OK OK], ret) + assert_equal('value2', @client.call('GET', '{slota}key1')) + assert_equal('value1', @client.call('GET', '{slota}key2')) + + # A second transaction, on the same node + ret = swap_keys_txn.call('{slota}key3', '{slota}key4') + assert_equal(%w[OK OK], ret) + assert_equal('value2', @client.call('GET', '{slota}key3')) + assert_equal('value1', @client.call('GET', '{slota}key4')) + + # A third transaction, on a different node + ret = swap_keys_txn.call('{slotb}key5', '{slotb}key6') + assert_equal(%w[OK OK], ret) + assert_equal('value2', @client.call('GET', '{slotb}key5')) + assert_equal('value1', @client.call('GET', '{slotb}key6')) + end + + def test_bare_unwatch + assert_raises(RedisClient::Cluster::AmbiguousNodeError) do + @client.call_v(['UNWATCH']) + end + end + def test_pubsub_without_subscription pubsub = @client.pubsub assert_nil(pubsub.next_event(0.01)) @@ -522,10 +733,12 @@ def hiredis_used? class PrimaryOnly < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -535,12 +748,14 @@ def new_test_client class ScaleReadRandom < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -550,12 +765,14 @@ def new_test_client class ScaleReadRandomWithPrimary < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -565,12 +782,14 @@ def new_test_client class ScaleReadLatency < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -580,14 +799,48 @@ def new_test_client class Pooled < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 }) end + + # This test only makes sense for pooled connections; each fiber should check out + # a different connection + def test_two_concurrent_transactions + # This actually can't work unless we store the current transaction for a RedisClient::Cluster + # in a fiber-local variable. The best way to do that would be to add a dependency on concurrent-ruby, + # since otherwise fiber-local variables are prone to collisions and require finalizers to properly + # avoid leaks. + skip 'requires dependency on concurrent-ruby' + + swap_keys_txn_factory = lambda { |key1, key2| + lambda { + @client.call('SET', key1, 'value1') + @client.call('SET', key2, 'value2') + @client.call_v(['WATCH', key1]) + @client.call_v(['WATCH', key2]) + old_value1 = @client.call_v(['GET', key1]) + old_value2 = @client.call_v(['GET', key2]) + Fiber.yield + @client.multi do |txn| + txn.call_v(['SET', key1, old_value2]) + txn.call_v(['SET', key2, old_value1]) + end + } + } + + txn1 = Fiber.new(&swap_keys_txn_factory.call('{slota}key1', '{slota}key2')) + txn2 = Fiber.new(&swap_keys_txn_factory.call('{slotb}key3', '{slotb}key4')) + [txn1, txn2, txn1, txn2].each(&:resume) + # [txn1, txn1, txn2, txn2].each(&:resume) + [txn1, txn2].join + end end end end diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 9ce25d9c..847fbc6d 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -34,8 +34,39 @@ def test_a_primary_is_down do_test_a_node_is_down(sacrifice, number_of_keys: 10) end + def test_transaction_aborts_when_node_down + # Need a new client for this test which isn't used in the helpers + test_client = ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + replica: true, + fixed_hostname: TEST_FIXED_HOSTNAME, + **TEST_GENERIC_OPTIONS + ).new_client + key = 'key1' + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + sacrifice = @controller.select_sacrifice_by_slot(slot) + + test_client.call_v(['WATCH', key]) + + kill_a_node(sacrifice, kill_attempts: 10) + wait_for_cluster_to_be_ready(wait_attempts: 10) + + # We don't know that it was closed yet + assert client_in_transaction?(test_client) + # It was closed + assert_raises(RedisClient::ConnectionError) { test_client.call_v(['GET', key]) } + # We're not in a transaction anymore + refute client_in_transaction?(test_client) + # So doing some other access will be fine now. + assert_nil(test_client.call_v(%w[GET other_key])) + end + private + def client_in_transaction?(client) + !!client.instance_variable_get(:@transaction) + end + def wait_for_replication client_side_timeout = TEST_TIMEOUT_SEC + 1.0 server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i @@ -63,7 +94,7 @@ def kill_a_node(sacrifice, kill_attempts:) refute_nil(sacrifice, "#{sacrifice.config.host}:#{sacrifice.config.port}") loop do - break if kill_attempts <= 0 + assert_operator(kill_attempts, :>=, 0) sacrifice.call('SHUTDOWN', 'NOSAVE') rescue ::RedisClient::CommandError => e @@ -74,8 +105,6 @@ def kill_a_node(sacrifice, kill_attempts:) kill_attempts -= 1 sleep WAIT_SEC end - - assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') } end def wait_for_cluster_to_be_ready(wait_attempts:) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 9e940438..f76293a7 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -13,6 +13,7 @@ def setup **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) ) @controller.rebuild + @capture_buffer = [] @client = new_test_client end @@ -59,6 +60,43 @@ def test_the_state_of_cluster_resharding_with_pipelining end end + def test_does_not_abort_transaction_on_resharding + key = 'key1' + @client.call_v(['SET', key, 'value1']) + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + @client.call_v(['WATCH', key]) + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + + # This should raise and the transaction should NOT aborted now + assert client_in_transaction?(@client) + assert_raises(::RedisClient::CommandError, 'MOVED') { @client.call_v(['GET', key]) } + assert client_in_transaction?(@client) + # In normal usage, redis-rb would catch that exception and call #unwatch, since CommandError + # does not inherit from ConnectionError. + assert_equal('OK', @client.call_v(['UNWATCH'])) + end + + def test_does_not_abort_transaction_on_clusterdown + key = 'key1' + @client.call_v(['SET', key, 'value1']) + @client.call_v(['WATCH', key]) + @controller.down + @controller.rebuild + + # We get no indication that anything has actually gone wrong with the transaction from redis, + # because we made no command whilst it was down. + assert client_in_transaction?(@client) + @client.call_v(['GET', key]) + + # but, if we try and commit something, it will fail - this is a guarantee provided by redis. + res = @client.multi do |txn| + txn.call_v(['SET', key, 'hello']) + end + assert_nil(res) + end + private def wait_for_replication @@ -84,6 +122,10 @@ def do_resharding_test(number_of_keys: 1000) yield(keys) @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) end + + def client_in_transaction?(client) + !!client.instance_variable_get(:@transaction) + end end class PrimaryOnly < TestingWrapper @@ -95,6 +137,8 @@ def new_test_client ::RedisClient.cluster( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @capture_buffer }, **TEST_GENERIC_OPTIONS ).new_client end @@ -124,6 +168,8 @@ def new_test_client replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @capture_buffer }, **TEST_GENERIC_OPTIONS ).new_client end @@ -153,6 +199,8 @@ def new_test_client replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @capture_buffer }, **TEST_GENERIC_OPTIONS ).new_client end @@ -182,6 +230,8 @@ def new_test_client replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @capture_buffer }, **TEST_GENERIC_OPTIONS ).new_client end @@ -196,6 +246,8 @@ def new_test_client ::RedisClient.cluster( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @capture_buffer }, **TEST_GENERIC_OPTIONS ).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2) end diff --git a/test/testing_helper.rb b/test/testing_helper.rb index 76f02e75..4d09ff23 100644 --- a/test/testing_helper.rb +++ b/test/testing_helper.rb @@ -6,6 +6,7 @@ require 'redis-cluster-client' require 'testing_constants' require 'cluster_controller' +require 'command_capture_middleware' case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby') when 'hiredis' then require 'hiredis-client'