Skip to content

Fix MULTI transactions calling the block twice #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Metrics/ClassLength:

Metrics/ModuleLength:
Max: 500
Exclude:
- 'test/**/*'

Metrics/MethodLength:
Max: 50
Expand Down
10 changes: 9 additions & 1 deletion lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@mutex = Mutex.new
@command_builder = @config.command_builder
@force_primary_level = 0
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand Down Expand Up @@ -174,7 +175,7 @@ def find_node_key(command, seed: nil)
key = @command.extract_first_key(command)
slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key)

if @command.should_send_to_primary?(command)
if @command.should_send_to_primary?(command) || @force_primary_level > 0
@node.find_node_key_of_primary(slot) || @node.any_primary_node_key(seed: seed)
else
@node.find_node_key_of_replica(slot, seed: seed) || @node.any_replica_node_key(seed: seed)
Expand Down Expand Up @@ -221,6 +222,13 @@ def close
@node.each(&:close)
end

def force_primary
@force_primary_level += 1
yield
ensure
@force_primary_level -= 1
end

private

def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize
Expand Down
74 changes: 56 additions & 18 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,83 @@ def initialize(router, command_builder)
@router = router
@command_builder = command_builder
@node_key = nil
@node = nil
@transaction_commands = []
end

def call(*command, **kwargs, &_)
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
ensure_node(command)
@transaction_commands << command
nil
end

def call_v(command, &_)
command = @command_builder.generate(command)
ensure_node_key(command)
ensure_node(command)
@transaction_commands << command
nil
end
alias call_once call
alias call_once_v call_v

def call_once(*command, **kwargs, &_)
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
def execute(watch: nil, &block)
@router.force_primary do
if watch&.any?
execute_with_watch(watch: watch, &block)
else
execute_without_watch(&block)
end
end
end

def call_once_v(command, &_)
command = @command_builder.generate(command)
ensure_node_key(command)
private

def execute_with_watch(watch:)
# Validate that all keys to be watched are on the same node
watch.each { |key| ensure_node(['WATCH', key]) }
# n.b - wrapping this in #try_delegate means we retry the whole transaction on failure, and also
# get the recover of e.g. detecting moved slots.
@router.try_delegate(@node, :with) do |conn|
commit_result = nil
@router.try_send(conn, :call_once_v, ['WATCH', *watch], [], retry_count: 0)
begin
yield self
commit_result = @router.try_delegate(conn, :multi, retry_count: 0) do |m|
@transaction_commands.each do |cmd|
m.call_once_v(cmd)
end
end
ensure
# unwatch, except if we committed (it's unnescessary) or the connection is broken anyway (it won't work)
@router.try_send(conn, :call_once_v, ['UNWATCH'], [], retry_count: 0) unless commit_result&.any? || !conn.connected?
end
end
end

def execute(watch: nil, &block)
def execute_without_watch
# We don't know what node is going to be executed on yet.
yield self
raise ArgumentError, 'empty transaction' if @node_key.nil?
return [] if @node.nil?

node = @router.find_node(@node_key)
@router.try_delegate(node, :multi, watch: watch, &block)
# Now we know. Accumulate the collected commands and send them to the right connection.
@router.try_delegate(@node, :multi) do |m|
@transaction_commands.each do |cmd|
m.call_v(cmd)
end
end
end

private

def ensure_node_key(command)
def ensure_node(command)
node_key = @router.find_primary_node_key(command)
raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil?

@node_key ||= node_key
raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key

if @node.nil?
@node = @router.find_node(node_key)
@node_key = node_key
elsif @node_key != node_key
raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key
end
nil
end
end
Expand Down
135 changes: 134 additions & 1 deletion test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ class RedisClient
class TestCluster
module Mixin
def setup
@captured_commands = []
@client = new_test_client
@client.call('FLUSHDB')
wait_for_replication
@captured_commands.clear
end

def teardown
Expand Down Expand Up @@ -207,7 +209,7 @@ def test_transaction_with_multiple_key
end

def test_transaction_with_empty_block
assert_raises(ArgumentError) { @client.multi {} }
assert_empty(@client.multi {})
assert_raises(LocalJumpError) { @client.multi }
end

Expand Down Expand Up @@ -251,6 +253,99 @@ def test_transaction_without_hashtag
end
end

def test_transaction_with_empty_watch
got = @client.multi(watch: []) do |t|
t.call('SET', '{hash}key1', 'value1')
t.call('SET', '{hash}key2', 'value2')
end

assert_equal(%w[OK OK], got)
assert_equal(%w[value1 value2], @client.call('MGET', '{hash}key1', '{hash}key2'))
end

def test_transaction_with_cross_slot_watch
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi(watch: Array.new(20) { |i| "key#{i}" }) {}
end
end

def test_transaction_with_successful_watch
@client.call('SET', 'key', '1')
got = @client.multi(watch: ['key']) do |t|
old_value = @client.call('GET', 'key').to_i
t.call('SET', 'key', (old_value + 1).to_s)
end

assert_equal(%w[OK], got)
assert_equal('2', @client.call('GET', 'key'))
end

def test_transaction_does_not_unwatch_on_commit
@client.call('SET', 'key', '1')
@captured_commands.clear

@client.multi(watch: ['key']) do |t|
old_value = @client.call('GET', 'key').to_i
t.call('SET', 'key', (old_value + 1).to_s)
end

assert_equal([
%w[WATCH key],
%w[GET key],
%w[MULTI],
%w[SET key 2],
%w[EXEC]
], @captured_commands.map(&:command))
end

def test_transaction_unwatches_on_abort
@client.call('SET', 'key', '1')
@captured_commands.clear

@client.multi(watch: ['key']) do |_t|
@client.call('GET', 'key')
end

assert_equal([
%w[WATCH key],
%w[GET key],
%w[UNWATCH]
], @captured_commands.map(&:command))
end

def test_transaction_unwatches_on_exception
@client.call('SET', 'key', '1')
@captured_commands.clear

assert_raises(StandardError) do
@client.multi(watch: ['key']) do |_t|
@client.call('GET', 'key')
raise StandardError, 'boom'
end
end

assert_equal([
%w[WATCH key],
%w[GET key],
%w[UNWATCH]
], @captured_commands.map(&:command))
end

def test_transaction_executes_on_primary
keys = (1..20).map { |k| "{hash}key#{k}" }
keys.each { |key| @client.call('SET', key, '1') }
primary_url_for_slot = @captured_commands.last.server_url
@captured_commands.clear

@client.multi(watch: keys) do |_t|
keys.each { |key| @client.call('GET', key) }
end

@captured_commands.each do |cmd|
assert_equal(primary_url_for_slot, cmd.server_url)
end
end

def test_pubsub_without_subscription
pubsub = @client.pubsub
assert_nil(pubsub.next_event(0.01))
Expand Down Expand Up @@ -519,13 +614,43 @@ def hiredis_used?
end
end

module CommandCaptureMiddleware
CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do
def inspect
"#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >"
end
end

def call(command, redis_config)
redis_config.custom[:captured_commands] << CapturedCommand.new(
server_url: redis_config.server_url,
command: command,
pipelined: false
)
super
end

def call_pipelined(commands, redis_config)
commands.map do |command|
redis_config.custom[:captured_commands] << CapturedCommand.new(
server_url: redis_config.server_url,
command: command,
pipelined: true
)
end
super
end
end

class PrimaryOnly < TestingWrapper
include Mixin

def new_test_client
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: @captured_commands },
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
Expand All @@ -541,6 +666,8 @@ def new_test_client
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: @captured_commands },
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
Expand All @@ -556,6 +683,8 @@ def new_test_client
replica: true,
replica_affinity: :random_with_primary,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: @captured_commands },
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
Expand All @@ -571,6 +700,8 @@ def new_test_client
replica: true,
replica_affinity: :latency,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: @captured_commands },
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
Expand All @@ -584,6 +715,8 @@ def new_test_client
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
custom: { captured_commands: @captured_commands },
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 })
Expand Down