Skip to content

Support watch & unwatch properly #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Metrics/ClassLength:

Metrics/ModuleLength:
Max: 500
Exclude:
- 'test/**/*'

Metrics/MethodLength:
Max: 50
Expand Down
60 changes: 49 additions & 11 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,37 @@ def inspect

def call(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:call_v, command, &block)
send_command(:call_v, command, &block)
end

def call_v(command, &block)
command = @command_builder.generate(command)
@router.send_command(:call_v, command, &block)
send_command(:call_v, command, &block)
end

def call_once(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:call_once_v, command, &block)
send_command(:call_once_v, command, &block)
end

def call_once_v(command, &block)
command = @command_builder.generate(command)
@router.send_command(:call_once_v, command, &block)
send_command(:call_once_v, command, &block)
end

def blocking_call(timeout, *args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
@router.send_command(:blocking_call_v, command, timeout, &block)
send_command(:blocking_call_v, command, timeout, &block)
end

def blocking_call_v(timeout, command, &block)
command = @command_builder.generate(command)
@router.send_command(:blocking_call_v, command, timeout, &block)
send_command(:blocking_call_v, command, timeout, &block)
end

def scan(*args, **kwargs, &block)
raise ArgumentError, 'block required' unless block
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'scan is not valid inside a transaction' if @transaction

seed = Random.new_seed
cursor = ZERO_CURSOR_FOR_SCAN
Expand All @@ -66,17 +67,17 @@ def scan(*args, **kwargs, &block)
end

def sscan(key, *args, **kwargs, &block)
node = @router.assign_node(['SSCAN', key])
node = assign_node(['SSCAN', key])
@router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
end

def hscan(key, *args, **kwargs, &block)
node = @router.assign_node(['HSCAN', key])
node = assign_node(['HSCAN', key])
@router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
end

def zscan(key, *args, **kwargs, &block)
node = @router.assign_node(['ZSCAN', key])
node = assign_node(['ZSCAN', key])
@router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
end

Expand All @@ -90,7 +91,21 @@ def pipelined
end

def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
# This is tricky and importnat.
# If you call #multi _outside_ of a watch block, you expect that the connection
# has returned to its original state at the end of the block; what was watched with the watch:
# kwarg is unwatched if the transaction is not committed.
# However, if you call #multi in a watch block (from Redis::Cluster), Redis::Cluster#watch actually
# calls unwatch if an exception gets thrown, _including an exception from inside a multi block_.
# So, we need to record whether a transaction already existed before calling multi; if so, we leave
# responsibility for disposing of that transaction to the caller who created it.
transaction_was_preexisting = [email protected]?
build_transaction!
begin
@transaction.multi(watch: watch, &block)
ensure
@transaction = nil if @transaction&.complete? || !transaction_was_preexisting
end
end

def pubsub
Expand All @@ -109,7 +124,7 @@ def method_missing(name, *args, **kwargs, &block)
if @router.command_exists?(name)
args.unshift(name)
command = @command_builder.generate(args, kwargs)
return @router.send_command(:call_v, command, &block)
return send_command(:call_v, command, &block)
end

super
Expand All @@ -120,5 +135,28 @@ def respond_to_missing?(name, include_private = false)

super
end

def build_transaction!
return if @transaction

@transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
end

def send_command(method, command, *args, &block)
build_transaction! if ::RedisClient::Cluster::Transaction.command_starts_transaction?(command)
if @transaction
begin
@transaction.send_command(method, command, *args, &block)
ensure
@transaction = nil if @transaction&.complete?
end
else
@router.send_command(method, command, *args, &block)
end
end

def assign_node(command)
@transaction ? @transaction.node : @router.assign_node(command)
end
end
end
54 changes: 53 additions & 1 deletion lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ class Command
LEFT_BRACKET = '{'
RIGHT_BRACKET = '}'
EMPTY_HASH = {}.freeze
EMPTY_ARRAY = [].freeze
PRIMARY_COMMANDS = %w[watch unwatch multi exec discard].freeze

Detail = Struct.new(
'RedisCommand',
:first_key_position,
:last_key_position,
:key_step,
:write?,
:readonly?,
keyword_init: true
Expand Down Expand Up @@ -50,6 +54,8 @@ def parse_command_reply(rows)

acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new(
first_key_position: row[3],
last_key_position: row[4],
key_step: row[5],
write?: row[2].include?('write'),
readonly?: row[2].include?('readonly')
)
Expand All @@ -70,9 +76,20 @@ def extract_first_key(command)
hash_tag.empty? ? key : hash_tag
end

def extract_all_keys(command)
keys_start = determine_first_key_position(command)
keys_end = determine_last_key_position(command, keys_start)
keys_step = determine_key_step(command)
return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?)

keys_end = [keys_end, command.size - 1].min
# use .. inclusive range because keys_end is a valid index.
(keys_start..keys_end).step(keys_step).map { |i| command[i] }
end

def should_send_to_primary?(command)
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
@commands[name]&.write?
@commands[name]&.write? || PRIMARY_COMMANDS.include?(name)
end

def should_send_to_replica?(command)
Expand Down Expand Up @@ -101,6 +118,41 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo
end
end

# IMPORTANT: this determines the last key position INCLUSIVE of the last key -
# i.e. command[determine_last_key_position(command)] is a key.
# This is in line with what Redis returns from COMMANDS.
def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't end up needing this method but it was tricky enough to write, and a natural enough analogue of determine_first_key_position, that i'm almost tempted to suggest it be committed in case it's needed in the future.

case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
when 'eval', 'evalsha', 'zinterstore', 'zunionstore'
# EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]]
# ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
command[2].to_i + 2
when 'object', 'memory'
# OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key
# MEMORY USAGE key [SAMPLES count]
keys_start
when 'migrate'
# MIGRATE host port <key | ""> destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]]
command[3].empty? ? (command.length - 1) : 3
when 'xread', 'xreadgroup'
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
keys_start + ((command.length - keys_start) / 2) - 1
else
# If there is a fixed, non-variable number of keys, don't iterate past that.
if @commands[name].last_key_position >= 0
@commands[name].last_key_position
else
command.length - 1
end
end
end

def determine_key_step(command)
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
# Some commands like EVALSHA have zero as the step in COMMANDS somehow.
@commands[name].key_step == 0 ? 1 : @commands[name].key_step
end

def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
idx.nil? ? 0 : idx + 1
Expand Down
28 changes: 12 additions & 16 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,56 +83,52 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
rescue ::RedisClient::CommandError => e
raise if retry_count <= 0

if e.message.start_with?('MOVED')
node = assign_redirection_node(e.message)
retry_count -= 1
retry
retry if retry_count >= 0
elsif e.message.start_with?('ASK')
node = assign_asking_node(e.message)
node.call('ASKING')
retry_count -= 1
retry
retry if retry_count >= 0
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
update_cluster_info!
retry_count -= 1
retry
else
raise
retry if retry_count >= 0
end
raise
rescue ::RedisClient::ConnectionError => e
raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError)
raise if retry_count <= 0

update_cluster_info!

raise if retry_count <= 0

retry_count -= 1
retry
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize
def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
node.public_send(method, *args, **kwargs, &block)
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
rescue ::RedisClient::CommandError => e
raise if retry_count <= 0

if e.message.start_with?('MOVED')
node = assign_redirection_node(e.message)
retry_count -= 1
retry
retry if retry_count >= 0
elsif e.message.start_with?('ASK')
node = assign_asking_node(e.message)
node.call('ASKING')
retry_count -= 1
retry
retry if retry_count >= 0
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
update_cluster_info!
retry_count -= 1
retry
else
raise
retry if retry_count >= 0
end
raise
rescue ::RedisClient::ConnectionError
raise if retry_count <= 0

Expand Down
Loading