diff --git a/.rubocop.yml b/.rubocop.yml index 6d1f5936..a81474ce 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,6 +19,8 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 500 + Exclude: + - 'test/**/*' Metrics/MethodLength: Max: 50 diff --git a/README.md b/README.md index 6bda4aaf..b333b935 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,94 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3') #=> [nil, nil, nil] ``` +## Transactions +This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`, +and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a +transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using +`#with`. Pass a key to `#with`, and the yielded connection object will enforce that all keys used in the block must +belong to the same slot. + +```ruby +cli.with(key: '{key}1') do |conn| + conn.call('SET', '{key}1', 'This is OK') + #=> "OK" + conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot') + #=> "OK" + conn.call('SET', '{otherslot}3', 'This will raise an exception; this key is not in the same slot as {key}1') + #=> Connection pinned to slot 12539 but command ["SET", "{otherslot}3", "..."] includes key {otherslot}3 in slot 10271 (RedisClient::Cluster::Transaction::ConsistencyError) +end +``` + +When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`: + +```ruby +cli.with(key: '{myslot}') do |conn| + # You can use any key starting with {myslot} in this block +end +``` + +Once you have pinned a client to a particular slot, you can use the same transaction APIs as the +[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows. + +```ruby +# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two. +cli.call('SET', 'key', 0) +cli.with(key: 'key') do |conn| + conn.multi do |txn| + txn.call('INCR', 'key') + txn.call('INCR', 'key') + end + #=> ['OK', 'OK'] +end + +# Conditional execution with WATCH can be used to e.g. atomically swap two keys +cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2') +cli.with(key: '{key}') do |conn| + conn.call('WATCH', '{key}1', '{key}2') + conn.multi do |txn| + old_key1 = conn.call('GET', '{key}1') + old_key2 = conn.call('GET', '{key}2') + txn.call('SET', '{key}1', old_key2) + txn.call('SET', '{key}2', old_key1) + end + # This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified + # either of the values +end + +# You can also pass watch: to #multi as a shortcut +cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2') +cli.with(key: '{key}') do |conn| + conn.multi(watch: ['{key}1', '{key}2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2') + txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1) + end +end +``` + +Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because +you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot +it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero +`retry_count` to `#with`. + +```ruby +cli.with(key: '{key}', retry_count: 1) do |conn| + conn.call('GET', '{key}1') + #=> "value1" + + # Now, some changes in cluster topology mean that {key} is moved to a different node! + + conn.call('GET', '{key}2') + #=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError) + + # Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered + # correct node. +end +``` + +Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its +arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with` +call will pin the connection to a slot when using clustering, or be a no-op when not. + ## ACL The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly. So please permit it like the followings. diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 0ea6eccd..27aff18f 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -1,10 +1,10 @@ # frozen_string_literal: true require 'redis_client/cluster/concurrent_worker' +require 'redis_client/cluster/pinning' require 'redis_client/cluster/pipeline' require 'redis_client/cluster/pub_sub' require 'redis_client/cluster/router' -require 'redis_client/cluster/transaction' class RedisClient class Cluster @@ -89,14 +89,34 @@ def pipelined pipeline.execute end - def multi(watch: nil, &block) - ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) + def multi(watch: nil, key: nil, &block) + slot_key = key + slot_key = watch&.first if slot_key.nil? + raise ArgumentError, 'watch or key must be provided' if slot_key.nil? + + with(key: slot_key) do |conn| + conn.multi(watch: watch, &block) + end end def pubsub ::RedisClient::Cluster::PubSub.new(@router, @command_builder) end + def with(key:, write: true, retry_count: 0) + raise ArgumentError, 'key must be provided' if key.nil? || key.empty? + + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + node_key = @router.find_node_key_by_key(key, primary: write) + node = @router.find_node(node_key) + # Calling #with checks out the underlying connection if this is a pooled connection + # Calling it through #try_delegate ensures we handle any redirections and retry the entire + # transaction if so. + @router.try_delegate(node, :with, retry_count: retry_count) do |conn| + yield ::RedisClient::Cluster::Pinning::ConnectionProxy.new(conn, slot, @router, @command_builder) + end + end + def close @concurrent_worker.close @router.close diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 05460482..dd7ca683 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -2,19 +2,21 @@ require 'redis_client' require 'redis_client/cluster/errors' +require 'redis_client/cluster/key_slot_converter' require 'redis_client/cluster/normalized_cmd_name' class RedisClient class Cluster class Command EMPTY_STRING = '' - LEFT_BRACKET = '{' - RIGHT_BRACKET = '}' EMPTY_HASH = {}.freeze + EMPTY_ARRAY = [].freeze Detail = Struct.new( 'RedisCommand', :first_key_position, + :last_key_position, + :key_step, :write?, :readonly?, keyword_init: true @@ -50,6 +52,8 @@ def parse_command_reply(rows) acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new( first_key_position: row[3], + last_key_position: row[4], + key_step: row[5], write?: row[2].include?('write'), readonly?: row[2].include?('readonly') ) @@ -65,9 +69,18 @@ def extract_first_key(command) i = determine_first_key_position(command) return EMPTY_STRING if i == 0 - key = (command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s - hash_tag = extract_hash_tag(key) - hash_tag.empty? ? key : hash_tag + (command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s + end + + def extract_all_keys(command) + keys_start = determine_first_key_position(command) + keys_end = determine_last_key_position(command, keys_start) + keys_step = determine_key_step(command) + return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?) + + keys_end = [keys_end, command.size - 1].min + # use .. inclusive range because keys_end is a valid index. + (keys_start..keys_end).step(keys_step).map { |i| command[i] } end def should_send_to_primary?(command) @@ -101,21 +114,44 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo end end - def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) - idx.nil? ? 0 : idx + 1 + # IMPORTANT: this determines the last key position INCLUSIVE of the last key - + # i.e. command[determine_last_key_position(command)] is a key. + # This is in line with what Redis returns from COMMANDS. + def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize + case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + when 'eval', 'evalsha', 'zinterstore', 'zunionstore' + # EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]] + # ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] + command[2].to_i + 2 + when 'object', 'memory' + # OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key + # MEMORY USAGE key [SAMPLES count] + keys_start + when 'migrate' + # MIGRATE host port destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]] + command[3].empty? ? (command.length - 1) : 3 + when 'xread', 'xreadgroup' + # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] + keys_start + ((command.length - keys_start) / 2) - 1 + else + # If there is a fixed, non-variable number of keys, don't iterate past that. + if @commands[name].last_key_position >= 0 + @commands[name].last_key_position + else + command.length + @commands[name].last_key_position + end + end end - # @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags - def extract_hash_tag(key) - key = key.to_s - s = key.index(LEFT_BRACKET) - return EMPTY_STRING if s.nil? - - e = key.index(RIGHT_BRACKET, s + 1) - return EMPTY_STRING if e.nil? + def determine_key_step(command) + name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command) + # Some commands like EVALSHA have zero as the step in COMMANDS somehow. + @commands[name].key_step == 0 ? 1 : @commands[name].key_step + end - key[s + 1..e - 1] + def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) + idx.nil? ? 0 : idx + 1 end end end diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index 0367b3d4..33dc5dcf 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -55,5 +55,11 @@ def initialize(_ = '') ) end end + + class BlockingReadTimeoutError < ::RedisClient::ReadTimeoutError; end + + module Transaction + ConsistencyError = Class.new(::RedisClient::Error) + end end end diff --git a/lib/redis_client/cluster/key_slot_converter.rb b/lib/redis_client/cluster/key_slot_converter.rb index cbb64eaf..8fee4910 100644 --- a/lib/redis_client/cluster/key_slot_converter.rb +++ b/lib/redis_client/cluster/key_slot_converter.rb @@ -3,6 +3,9 @@ class RedisClient class Cluster module KeySlotConverter + EMPTY_STRING = '' + LEFT_BRACKET = '{' + RIGHT_BRACKET = '}' XMODEM_CRC16_LOOKUP = [ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, @@ -45,6 +48,9 @@ module KeySlotConverter def convert(key) return nil if key.nil? + hash_tag = extract_hash_tag(key) + key = hash_tag unless hash_tag.empty? + crc = 0 key.each_byte do |b| crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff] @@ -52,6 +58,18 @@ def convert(key) crc % HASH_SLOTS end + + # @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags + def extract_hash_tag(key) + key = key.to_s + s = key.index(LEFT_BRACKET) + return EMPTY_STRING if s.nil? + + e = key.index(RIGHT_BRACKET, s + 1) + return EMPTY_STRING if e.nil? + + key[s + 1..e - 1] + end end end end diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index dc362e18..46a27d7b 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -80,7 +80,7 @@ def []=(index, element) class Config < ::RedisClient::Config def initialize(scale_read: false, **kwargs) @scale_read = scale_read - super(**kwargs) + super(**kwargs, client_implementation: Client) end private @@ -92,6 +92,22 @@ def build_connection_prelude end end + class Client < ::RedisClient + # We need to be able to differentiate between timeout errors caused by blocking read timeouts + # (which should NOT cause a cluster topology update) with normal read timeouts (which should) + def blocking_call(timeout, *command, **kwargs) + super + rescue ::RedisClient::TimeoutError => e + raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message + end + + def blocking_call_v(timeout, command) + super + rescue ::RedisClient::TimeoutError => e + raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message + end + end + class << self def load_info(options, concurrent_worker, slow_command_timeout: -1, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity raise ::RedisClient::Cluster::InitialSetupError, [] if options.nil? || options.empty? diff --git a/lib/redis_client/cluster/pinning.rb b/lib/redis_client/cluster/pinning.rb new file mode 100644 index 00000000..da19b339 --- /dev/null +++ b/lib/redis_client/cluster/pinning.rb @@ -0,0 +1,123 @@ +# frozen_string_literal: true + +require 'redis_client' +require 'delegate' + +class RedisClient + class Cluster + module Pinning + EMPTY_ARRAY = [].freeze + + class BaseDelegator < SimpleDelegator + def initialize(conn, slot, router, command_builder) + @conn = conn + @slot = slot + @router = router + @command_builder = command_builder + super(@conn) + end + + private + + def ensure_key_not_cross_slot!(key, method_name) + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + return unless slot != @slot + + raise ::RedisClient::Cluster::Transaction::ConsistencyError.new, + "Connection pinned to slot #{@slot} but method #{method_name} called with key #{key} in slot #{slot}" + end + + def ensure_command_not_cross_slot!(command) + command_keys = @router.command_extract_all_keys(command) + command_keys.each do |key| + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + if slot != @slot + raise ::RedisClient::Cluster::Transaction::ConsistencyError.new, + "Connection pinned to slot #{@slot} but command #{command.inspect} includes key #{key} in slot #{slot}" + end + end + end + end + + module CallDelegator + def call(*command, **kwargs) + command = @command_builder.generate(command, kwargs) + ensure_command_not_cross_slot! command + super + end + alias call_once call + + def call_v(command) + command = @command_builder.generate(command) + ensure_command_not_cross_slot! command + super + end + alias call_once_v call_v + end + + module BlockingCallDelegator + def blocking_call(timeout, *command, **kwargs) + command = @command_builder.generate(command, kwargs) + ensure_command_not_cross_slot! command + super + end + + def blocking_call_v(timeout, command) + command = @command_builder.generate(command) + ensure_command_not_cross_slot! command + super + end + end + + class PipelineProxy < BaseDelegator + include CallDelegator + include BlockingCallDelegator + end + + class MultiProxy < BaseDelegator + include CallDelegator + end + + class ConnectionProxy < BaseDelegator + include CallDelegator + include BlockingCallDelegator + + def sscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :sscan + super + end + + def hscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :hscan + super + end + + def zscan(key, *args, **kwargs, &block) + ensure_key_not_cross_slot! key, :zscan + super + end + + def pipelined + @conn.pipelined do |conn_pipeline| + yield PipelineProxy.new(conn_pipeline, @slot, @router, @command_builder) + end + end + + def multi(watch: nil) + call('WATCH', *watch) if watch + begin + @conn.pipelined do |conn_pipeline| + txn = MultiProxy.new(conn_pipeline, @slot, @router, @command_builder) + txn.call('MULTI') + yield txn + txn.call('EXEC') + end.last + rescue StandardError + call('UNWATCH') if connected? && watch + raise + end + end + end + end + end +end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 11c6e22b..40ce08ce 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -13,7 +13,6 @@ class RedisClient class Cluster class Router ZERO_CURSOR_FOR_SCAN = '0' - METHODS_FOR_BLOCKING_CMD = %i[blocking_call_v blocking_call].freeze TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry def initialize(config, concurrent_worker, pool: nil, **kwargs) @@ -83,60 +82,59 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise + rescue ::RedisClient::Cluster::BlockingReadTimeoutError + raise rescue ::RedisClient::ConnectionError => e - raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError) + update_cluster_info! + raise if retry_count <= 0 - update_cluster_info! retry_count -= 1 retry end - def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize + def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity node.public_send(method, *args, **kwargs, &block) rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::CommandError => e - raise if retry_count <= 0 - if e.message.start_with?('MOVED') node = assign_redirection_node(e.message) retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('ASK') node = assign_asking_node(e.message) node.call('ASKING') retry_count -= 1 - retry + retry if retry_count >= 0 elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') update_cluster_info! retry_count -= 1 - retry - else - raise + retry if retry_count >= 0 end + raise + rescue ::RedisClient::Cluster::BlockingReadTimeoutError + raise rescue ::RedisClient::ConnectionError + update_cluster_info! + raise if retry_count <= 0 - update_cluster_info! retry_count -= 1 retry end @@ -170,21 +168,25 @@ def assign_node(command) find_node(node_key) end - def find_node_key(command, seed: nil) - key = @command.extract_first_key(command) - slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key) - - if @command.should_send_to_primary?(command) - @node.find_node_key_of_primary(slot) || @node.any_primary_node_key(seed: seed) + def find_node_key_by_key(key, seed: nil, primary: false) + if key && !key.empty? + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot) else - @node.find_node_key_of_replica(slot, seed: seed) || @node.any_replica_node_key(seed: seed) + primary ? @node.any_primary_node_key(seed: seed) : @node.any_replica_node_key(seed: seed) end end + def find_node_key(command, seed: nil) + key = @command.extract_first_key(command) + find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command)) + end + def find_primary_node_key(command) key = @command.extract_first_key(command) - slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key) - @node.find_node_key_of_primary(slot) + return nil unless key&.size&.> 0 + + find_node_key_by_key(key, primary: true) end def find_node(node_key, retry_count: 3) @@ -201,6 +203,10 @@ def command_exists?(name) @command.exists?(name) end + def command_extract_all_keys(command) + @command.extract_all_keys(command) + end + def assign_redirection_node(err_msg) _, slot, node_key = err_msg.split slot = slot.to_i diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb deleted file mode 100644 index 7302a283..00000000 --- a/lib/redis_client/cluster/transaction.rb +++ /dev/null @@ -1,57 +0,0 @@ -# frozen_string_literal: true - -require 'redis_client' - -class RedisClient - class Cluster - class Transaction - ConsistencyError = Class.new(::RedisClient::Error) - - def initialize(router, command_builder) - @router = router - @command_builder = command_builder - @node_key = nil - end - - def call(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) - end - - def call_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) - end - - def call_once(*command, **kwargs, &_) - command = @command_builder.generate(command, kwargs) - ensure_node_key(command) - end - - def call_once_v(command, &_) - command = @command_builder.generate(command) - ensure_node_key(command) - end - - def execute(watch: nil, &block) - yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? - - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) - end - - private - - def ensure_node_key(command) - node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? - - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key - - nil - end - end - end -end diff --git a/test/command_capture_middleware.rb b/test/command_capture_middleware.rb new file mode 100644 index 00000000..362e187e --- /dev/null +++ b/test/command_capture_middleware.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module CommandCaptureMiddleware + CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do + def inspect + "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" + end + end + + def call(command, redis_config) + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: false + ) + super + end + + def call_pipelined(commands, redis_config) + commands.map do |command| + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: redis_config.server_url, + command: command, + pipelined: true + ) + end + super + end +end diff --git a/test/redis_client/cluster/node/test_latency_replica.rb b/test/redis_client/cluster/node/test_latency_replica.rb index 66987523..d85ffdfd 100644 --- a/test/redis_client/cluster/node/test_latency_replica.rb +++ b/test/redis_client/cluster/node/test_latency_replica.rb @@ -11,7 +11,7 @@ class TestLatencyReplica < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_primary_only.rb b/test/redis_client/cluster/node/test_primary_only.rb index ec60afdd..e8983dd8 100644 --- a/test/redis_client/cluster/node/test_primary_only.rb +++ b/test/redis_client/cluster/node/test_primary_only.rb @@ -12,7 +12,7 @@ class TestPrimaryOnly < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -28,7 +28,7 @@ def test_clients_with_pooled_redis_client got = test_topology.clients got.each_value do |client| - assert_instance_of(::RedisClient::Pooled, client) + assert_kind_of(::RedisClient::Pooled, client) assert_equal('master', client.call('ROLE').first) end ensure @@ -38,7 +38,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -46,7 +46,7 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -54,7 +54,7 @@ def test_replica_clients def test_clients_for_scanning got = @test_topology.clients_for_scanning got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end diff --git a/test/redis_client/cluster/node/test_random_replica.rb b/test/redis_client/cluster/node/test_random_replica.rb index 1116bf7a..969806c2 100644 --- a/test/redis_client/cluster/node/test_random_replica.rb +++ b/test/redis_client/cluster/node/test_random_replica.rb @@ -11,7 +11,7 @@ class TestRandomReplica < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_random_replica_or_primary.rb b/test/redis_client/cluster/node/test_random_replica_or_primary.rb index 33319d20..a40e5b72 100644 --- a/test/redis_client/cluster/node/test_random_replica_or_primary.rb +++ b/test/redis_client/cluster/node/test_random_replica_or_primary.rb @@ -11,7 +11,7 @@ class TestRandomReplicaWithPrimary < TestingWrapper def test_clients_with_redis_client got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) end @@ -25,7 +25,7 @@ def test_clients_with_pooled_redis_client ) got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each_value { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) ensure test_topology&.clients&.each_value(&:close) @@ -34,7 +34,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_topology.primary_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -42,14 +42,14 @@ def test_primary_clients def test_replica_clients got = @test_topology.replica_clients got.each_value do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got.each_value { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/test_command.rb b/test/redis_client/cluster/test_command.rb index b34bb2ae..34a992ad 100644 --- a/test/redis_client/cluster/test_command.rb +++ b/test/redis_client/cluster/test_command.rb @@ -49,20 +49,20 @@ def test_parse_command_reply [ { rows: [ - ['get', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], - ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, 1, 1, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] + ['get', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]], + ['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, -1, 2, Set['@write', '@string', '@slow'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true }, - 'set' => { first_key_position: 1, write?: true, readonly?: false } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true }, + 'set' => { first_key_position: 1, last_key_position: -1, key_step: 2, write?: true, readonly?: false } } }, { rows: [ - ['GET', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] + ['GET', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]] ], want: { - 'get' => { first_key_position: 1, write?: false, readonly?: true } + 'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true } } }, { rows: [[]], want: {} }, @@ -84,7 +84,7 @@ def test_extract_first_key [ { command: %w[SET foo 1], want: 'foo' }, { command: %w[GET foo], want: 'foo' }, - { command: %w[GET foo{bar}baz], want: 'bar' }, + { command: %w[GET foo{bar}baz], want: 'foo{bar}baz' }, { command: %w[MGET foo bar baz], want: 'foo' }, { command: %w[UNKNOWN foo bar], want: '' }, { command: [['GET'], 'foo'], want: 'foo' }, @@ -191,24 +191,33 @@ def test_determine_optional_key_position end end - def test_extract_hash_tag + def test_extract_all_keys cmd = ::RedisClient::Cluster::Command.load(@raw_clients) [ - { key: 'foo', want: '' }, - { key: 'foo{bar}baz', want: 'bar' }, - { key: 'foo{bar}baz{qux}quuc', want: 'bar' }, - { key: 'foo}bar{baz', want: '' }, - { key: 'foo{bar', want: '' }, - { key: 'foo}bar', want: '' }, - { key: 'foo{}bar', want: '' }, - { key: '{}foo', want: '' }, - { key: 'foo{}', want: '' }, - { key: '{}', want: '' }, - { key: '', want: '' }, - { key: nil, want: '' } + { command: ['EVAL', 'return ARGV[1]', '0', 'hello'], want: [] }, + { command: ['EVAL', 'return ARGV[1]', '3', 'key1', 'key2', 'key3', 'arg1', 'arg2'], want: %w[key1 key2 key3] }, + { command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: [] }, + { command: %w[EVALSHA sha1 2 foo bar baz zap], want: %w[foo bar] }, + { command: %w[MIGRATE host port key 0 5 COPY], want: %w[key] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1'], want: %w[key1] }, + { command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1', 'key2'], want: %w[key1 key2] }, + { command: %w[ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] }, + { command: %w[OBJECT HELP], want: [] }, + { command: %w[MEMORY HELP], want: [] }, + { command: %w[MEMORY USAGE key], want: %w[key] }, + { command: %w[XREAD COUNT 2 STREAMS mystream writers 0-0 0-0], want: %w[mystream writers] }, + { command: %w[XREADGROUP GROUP group consumer STREAMS key id], want: %w[key] }, + { command: %w[SET foo 1], want: %w[foo] }, + { command: %w[set foo 1], want: %w[foo] }, + { command: [['SET'], 'foo', 1], want: %w[foo] }, + { command: %w[GET foo], want: %w[foo] }, + { command: %w[MGET foo bar baz], want: %w[foo bar baz] }, + { command: %w[MSET foo val bar val baz val], want: %w[foo bar baz] }, + { command: %w[BLPOP foo bar 0], want: %w[foo bar] } ].each_with_index do |c, idx| msg = "Case: #{idx}" - got = cmd.send(:extract_hash_tag, c[:key]) + got = cmd.send(:extract_all_keys, c[:command]) assert_equal(c[:want], got, msg) end end diff --git a/test/redis_client/cluster/test_key_slot_converter.rb b/test/redis_client/cluster/test_key_slot_converter.rb index 48973adc..7de7609d 100644 --- a/test/redis_client/cluster/test_key_slot_converter.rb +++ b/test/redis_client/cluster/test_key_slot_converter.rb @@ -27,6 +27,27 @@ def test_convert got = ::RedisClient::Cluster::KeySlotConverter.convert(multi_byte_key) assert_equal(want, got, "Case: #{multi_byte_key}") end + + def test_extract_hash_tag + [ + { key: 'foo', want: '' }, + { key: 'foo{bar}baz', want: 'bar' }, + { key: 'foo{bar}baz{qux}quuc', want: 'bar' }, + { key: 'foo}bar{baz', want: '' }, + { key: 'foo{bar', want: '' }, + { key: 'foo}bar', want: '' }, + { key: 'foo{}bar', want: '' }, + { key: '{}foo', want: '' }, + { key: 'foo{}', want: '' }, + { key: '{}', want: '' }, + { key: '', want: '' }, + { key: nil, want: '' } + ].each_with_index do |c, idx| + msg = "Case: #{idx}" + got = ::RedisClient::Cluster::KeySlotConverter.extract_hash_tag(c[:key]) + assert_equal(c[:want], got, msg) + end + end end end end diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index d0819279..d78d765d 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -331,14 +331,14 @@ def test_find_by msg = "Case: primary only: #{info.node_key}" got = -> { @test_node.find_by(info.node_key) } if info.primary? - assert_instance_of(::RedisClient, got.call, msg) + assert_kind_of(::RedisClient, got.call, msg) else assert_raises(::RedisClient::Cluster::Node::ReloadNeeded, msg, &got) end msg = "Case: scale read: #{info.node_key}" got = @test_node_with_scale_read.find_by(info.node_key) - assert_instance_of(::RedisClient, got, msg) + assert_kind_of(::RedisClient, got, msg) end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 02f477f7..2d387799 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,9 +6,11 @@ class RedisClient class TestCluster module Mixin def setup + @captured_commands = [] @client = new_test_client @client.call('FLUSHDB') wait_for_replication + @captured_commands.clear end def teardown @@ -176,7 +178,7 @@ def test_pipelined_with_many_commands end def test_transaction_with_single_key - got = @client.multi do |t| + got = @client.multi(key: 'counter') do |t| t.call('SET', 'counter', '0') t.call('INCR', 'counter') t.call('INCR', 'counter') @@ -188,7 +190,7 @@ def test_transaction_with_single_key def test_transaction_with_multiple_key assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| + @client.multi(key: 'key1') do |t| t.call('SET', 'key1', '1') t.call('SET', 'key2', '2') t.call('SET', 'key3', '3') @@ -202,20 +204,19 @@ def test_transaction_with_multiple_key def test_transaction_with_empty_block assert_raises(ArgumentError) { @client.multi {} } - assert_raises(LocalJumpError) { @client.multi } + assert_raises(LocalJumpError) { @client.multi(key: 'foo') } end def test_transaction_with_keyless_commands - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| - t.call('ECHO', 'foo') - t.call('ECHO', 'bar') - end + got = @client.multi(key: 'hello') do |t| + t.call('ECHO', 'foo') + t.call('ECHO', 'bar') end + assert_equal %w[foo bar], got end def test_transaction_with_hashtag - got = @client.multi do |t| + got = @client.multi(key: '{key}') do |t| t.call('MSET', '{key}1', '1', '{key}2', '2') t.call('MSET', '{key}3', '3', '{key}4', '4') end @@ -226,14 +227,14 @@ def test_transaction_with_hashtag def test_transaction_without_hashtag assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.multi do |t| + @client.multi(key: 'key1') do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key3', '3', 'key4', '4') end end - assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do - @client.multi do |t| + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(key: 'key1') do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key1', '1', 'key3', '3') t.call('MSET', 'key1', '1', 'key4', '4') @@ -478,6 +479,213 @@ def test_circuit_breakers cli&.close end + def test_pinning_single_key + got = @client.with(key: 'key1') do |conn| + conn.call('SET', 'key1', 'hello') + conn.call('GET', 'key1') + end + assert_equal('hello', got) + end + + def test_pinning_no_key + assert_raises(ArgumentError) do + @client.with(key: nil) {} + end + end + + def test_pinning_two_keys + got = @client.with(key: '{slot}') do |conn| + conn.call('SET', '{slot}key1', 'v1') + conn.call('SET', '{slot}key2', 'v2') + conn.call('MGET', '{slot}key1', '{slot}key2') + end + assert_equal(%w[v1 v2], got) + end + + def test_pinning_cross_slot + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.with(key: '{slot1}') do |conn| + conn.call('GET', '{slot2}') + end + end + end + + def test_pinning_pipeline + got = @client.with(key: '{slot}') do |conn| + conn.call_v(['SET', '{slot}counter', 0]) + conn.pipelined do |pipe| + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + end + conn.call_v(['GET', '{slot}counter']).to_i + end + + assert_equal(3, got) + end + + def test_pinning_pipeline_with_error + assert_raises(RedisClient::CommandError) do + @client.with(key: '{slot}') do |conn| + conn.pipelined do |pipeline| + pipeline.call('SET', '{slot}key', 'first') + pipeline.call('SET', '{slot}key', 'second', 'too many args') + pipeline.call('SET', '{slot}key', 'third') + end + end + end + + wait_for_replication + assert_equal('third', @client.call('GET', '{slot}key')) + end + + def test_pinning_transaction + got = @client.with(key: '{slot}') do |conn| + conn.multi do |txn| + txn.call('SET', '{slot}key1', 'value1') + txn.call('SET', '{slot}key2', 'value2') + end + end + + assert_equal(%w[OK OK], got) + end + + def test_pinning_transaction_watch_arg + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(key: '{slot}') do |conn| + conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_watch_arg_unwatches_on_raise + ex = Class.new(StandardError) + @captured_commands.clear + + assert_raises(ex) do + @client.with(key: '{slot}') do |conn| + conn.multi(watch: ['{slot}key1']) do |_txn| + conn.call('GET', '{slot}key1') + raise ex, 'boom' + end + end + end + + assert_equal([ + %w[WATCH {slot}key1], + %w[GET {slot}key1], + %w[UNWATCH] + ], @captured_commands.map(&:command)) + end + + def test_pinning_transaction_can_watch_manually + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(key: '{slot}') do |conn| + conn.call('WATCH', '{slot}key1', '{slot}key2') + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + conn.multi do |txn| + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_can_unwatch_manually + got = @client.with(key: '{slot}') do |conn| + conn.call('WATCH', '{slot}key1') + conn.call('UNWATCH') + end + + assert_equal('OK', got) + end + + def test_pinning_nonblocking_timeouts_update_topology + @client.call('DEL', '{slot}list') + @captured_commands.clear + + assert_raises(::RedisClient::ReadTimeoutError) do + @client.with(key: '{slot}') do |conn| + conn.call_v(['BLPOP', '{slot}list', 0]) + end + end + assert_includes(@captured_commands.map(&:command), %w[CLUSTER NODES]) + end + + def test_pinning_blocking_timeouts_do_not_update_topology + @client.call('DEL', '{slot}list') + @captured_commands.clear + + assert_raises(::RedisClient::ReadTimeoutError) do + @client.with(key: '{slot}') do |conn| + conn.blocking_call_v(1, ['BLPOP', '{slot}list', 0]) + end + end + refute_includes(@captured_commands.map(&:command), %w[CLUSTER NODES]) + end + + def test_pinning_sscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << i + @client.call('SADD', '{slot}set', i) + end + @client.with(key: '{slot}') do |conn| + conn.sscan('{slot}set') do |i| + scanned_set << i.to_i + end + end + + assert_equal(expected_set, scanned_set) + end + + def test_pinning_zscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << "member#{i}" + @client.call('ZADD', '{slot}set', i, "member#{i}") + end + @client.with(key: '{slot}') do |conn| + conn.zscan('{slot}set') do |i| + scanned_set << i + end + end + + assert_equal(expected_set, scanned_set) + end + private def wait_for_replication @@ -516,10 +724,12 @@ def hiredis_used? class PrimaryOnly < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -529,12 +739,14 @@ def new_test_client class ScaleReadRandom < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -544,12 +756,14 @@ def new_test_client class ScaleReadRandomWithPrimary < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -559,12 +773,14 @@ def new_test_client class ScaleReadLatency < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config) @@ -574,10 +790,12 @@ def new_test_client class Pooled < TestingWrapper include Mixin - def new_test_client + def new_test_client(capture_buffer: @captured_commands) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS ) ::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 }) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 813ba4df..096e9cbf 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -84,6 +84,52 @@ def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection end end + def test_aborts_pinning_on_resharding + @client.call_v(['SET', '{slot}counter', 0]) + slot = ::RedisClient::Cluster::KeySlotConverter.convert('{slot}') + + assert_raises(::RedisClient::CommandError) do + @client.with(key: '{slot}') do |conn| + conn.call_v(['INCR', '{slot}counter']) # should work + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + # this should now fail for want of being on the wrong slot. + conn.call_v(['INCR', '{slot}counter']) + end + end + + # The second one will now work though, because we processed the resharding. + @client.with(key: '{slot}') do |conn| + conn.call_v(['INCR', '{slot}counter']) + end + + # There were two INCRs which should work. + assert_equal(2, @client.call_v(['GET', '{slot}counter']).to_i) + end + + def test_can_retry_pinning_on_resharding + @client.call_v(['SET', '{slot}counter', 0]) + slot = ::RedisClient::Cluster::KeySlotConverter.convert('{slot}') + first_time = true + @client.with(key: '{slot}', retry_count: 1) do |conn| + conn.call_v(['INCR', '{slot}counter']) # should work + + if first_time + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + first_time = false + end + + # this should now fail for want of being on the wrong slot (the first time) + conn.call_v(['INCR', '{slot}counter']) + end + + # There were three INCRs which should work. + assert_equal(3, @client.call_v(['GET', '{slot}counter']).to_i) + end + private def wait_for_replication diff --git a/test/testing_helper.rb b/test/testing_helper.rb index 76f02e75..4d09ff23 100644 --- a/test/testing_helper.rb +++ b/test/testing_helper.rb @@ -6,6 +6,7 @@ require 'redis-cluster-client' require 'testing_constants' require 'cluster_controller' +require 'command_capture_middleware' case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby') when 'hiredis' then require 'hiredis-client'