diff --git a/.rubocop.yml b/.rubocop.yml index 6d1f5936..a81474ce 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,6 +19,8 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 500 + Exclude: + - 'test/**/*' Metrics/MethodLength: Max: 50 diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 11c6e22b..38fac6d2 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -27,6 +27,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @mutex = Mutex.new @command_builder = @config.command_builder + @force_primary_level = 0 end def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity @@ -174,7 +175,7 @@ def find_node_key(command, seed: nil) key = @command.extract_first_key(command) slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key) - if @command.should_send_to_primary?(command) + if @command.should_send_to_primary?(command) || @force_primary_level > 0 @node.find_node_key_of_primary(slot) || @node.any_primary_node_key(seed: seed) else @node.find_node_key_of_replica(slot, seed: seed) || @node.any_replica_node_key(seed: seed) @@ -221,6 +222,13 @@ def close @node.each(&:close) end + def force_primary + @force_primary_level += 1 + yield + ensure + @force_primary_level -= 1 + end + private def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 7302a283..e8619d40 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -11,45 +11,83 @@ def initialize(router, command_builder) @router = router @command_builder = command_builder @node_key = nil + @node = nil + @transaction_commands = [] end def call(*command, **kwargs, &_) command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + ensure_node(command) + @transaction_commands << command + nil end def call_v(command, &_) command = @command_builder.generate(command) - ensure_node_key(command) + ensure_node(command) + @transaction_commands << command + nil end + alias call_once call + alias call_once_v call_v - def call_once(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + def execute(watch: nil, &block) + @router.force_primary do + if watch&.any? + execute_with_watch(watch: watch, &block) + else + execute_without_watch(&block) + end + end end - def call_once_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) + private + + def execute_with_watch(watch:) + # Validate that all keys to be watched are on the same node + watch.each { |key| ensure_node(['WATCH', key]) } + # n.b - wrapping this in #try_delegate means we retry the whole transaction on failure, and also + # get the recover of e.g. detecting moved slots. + @router.try_delegate(@node, :with) do |conn| + commit_result = nil + @router.try_send(conn, :call_once_v, ['WATCH', *watch], [], retry_count: 0) + begin + yield self + commit_result = @router.try_delegate(conn, :multi, retry_count: 0) do |m| + @transaction_commands.each do |cmd| + m.call_once_v(cmd) + end + end + ensure + # unwatch, except if we committed (it's unnescessary) or the connection is broken anyway (it won't work) + @router.try_send(conn, :call_once_v, ['UNWATCH'], [], retry_count: 0) unless commit_result&.any? || !conn.connected? + end + end end - def execute(watch: nil, &block) + def execute_without_watch + # We don't know what node is going to be executed on yet. yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? + return [] if @node.nil? - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) + # Now we know. Accumulate the collected commands and send them to the right connection. + @router.try_delegate(@node, :multi) do |m| + @transaction_commands.each do |cmd| + m.call_v(cmd) + end + end end - private - - def ensure_node_key(command) + def ensure_node(command) node_key = @router.find_primary_node_key(command) raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key - + if @node.nil? + @node = @router.find_node(node_key) + @node_key = node_key + elsif @node_key != node_key + raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key + end nil end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 5fdb5e72..24e100df 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,9 +6,11 @@ class RedisClient class TestCluster module Mixin def setup + @captured_commands = [] @client = new_test_client @client.call('FLUSHDB') wait_for_replication + @captured_commands.clear end def teardown @@ -207,7 +209,7 @@ def test_transaction_with_multiple_key end def test_transaction_with_empty_block - assert_raises(ArgumentError) { @client.multi {} } + assert_empty(@client.multi {}) assert_raises(LocalJumpError) { @client.multi } end @@ -251,6 +253,99 @@ def test_transaction_without_hashtag end end + def test_transaction_with_empty_watch + got = @client.multi(watch: []) do |t| + t.call('SET', '{hash}key1', 'value1') + t.call('SET', '{hash}key2', 'value2') + end + + assert_equal(%w[OK OK], got) + assert_equal(%w[value1 value2], @client.call('MGET', '{hash}key1', '{hash}key2')) + end + + def test_transaction_with_cross_slot_watch + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(watch: Array.new(20) { |i| "key#{i}" }) {} + end + end + + def test_transaction_with_successful_watch + @client.call('SET', 'key', '1') + got = @client.multi(watch: ['key']) do |t| + old_value = @client.call('GET', 'key').to_i + t.call('SET', 'key', (old_value + 1).to_s) + end + + assert_equal(%w[OK], got) + assert_equal('2', @client.call('GET', 'key')) + end + + def test_transaction_does_not_unwatch_on_commit + @client.call('SET', 'key', '1') + @captured_commands.clear + + @client.multi(watch: ['key']) do |t| + old_value = @client.call('GET', 'key').to_i + t.call('SET', 'key', (old_value + 1).to_s) + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[MULTI], + %w[SET key 2], + %w[EXEC] + ], @captured_commands.map(&:command)) + end + + def test_transaction_unwatches_on_abort + @client.call('SET', 'key', '1') + @captured_commands.clear + + @client.multi(watch: ['key']) do |_t| + @client.call('GET', 'key') + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_transaction_unwatches_on_exception + @client.call('SET', 'key', '1') + @captured_commands.clear + + assert_raises(StandardError) do + @client.multi(watch: ['key']) do |_t| + @client.call('GET', 'key') + raise StandardError, 'boom' + end + end + + assert_equal([ + %w[WATCH key], + %w[GET key], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_transaction_executes_on_primary + keys = (1..20).map { |k| "{hash}key#{k}" } + keys.each { |key| @client.call('SET', key, '1') } + primary_url_for_slot = @captured_commands.last.server_url + @captured_commands.clear + + @client.multi(watch: keys) do |_t| + keys.each { |key| @client.call('GET', key) } + end + + @captured_commands.each do |cmd| + assert_equal(primary_url_for_slot, cmd.server_url) + end + end + def test_pubsub_without_subscription pubsub = @client.pubsub assert_nil(pubsub.next_event(0.01)) @@ -519,6 +614,34 @@ def hiredis_used? end end + module CommandCaptureMiddleware + CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do + def inspect + "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" + end + end + + def call(command, redis_config) + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: false + ) + super + end + + def call_pipelined(commands, redis_config) + commands.map do |command| + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: true + ) + end + super + end + end + class PrimaryOnly < TestingWrapper include Mixin @@ -526,6 +649,8 @@ def new_test_client config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @captured_commands }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -541,6 +666,8 @@ def new_test_client replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @captured_commands }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -556,6 +683,8 @@ def new_test_client replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @captured_commands }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -571,6 +700,8 @@ def new_test_client replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @captured_commands }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -584,6 +715,8 @@ def new_test_client config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: @captured_commands }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 })