Skip to content

Implement "pinning" cluster connection to a single node - RedisClient::Cluster#with #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Metrics/ClassLength:

Metrics/ModuleLength:
Max: 500
Exclude:
- 'test/**/*'

Metrics/MethodLength:
Max: 50
Expand Down
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,94 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3')
#=> [nil, nil, nil]
```

## Transactions
This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`,
and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a
transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using
`#with`. Pass a key to `#with`, and the yielded connection object will enforce that all keys used in the block must
belong to the same slot.

```ruby
cli.with(key: '{key}1') do |conn|
conn.call('SET', '{key}1', 'This is OK')
#=> "OK"
conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot')
#=> "OK"
conn.call('SET', '{otherslot}3', 'This will raise an exception; this key is not in the same slot as {key}1')
#=> Connection pinned to slot 12539 but command ["SET", "{otherslot}3", "..."] includes key {otherslot}3 in slot 10271 (RedisClient::Cluster::Transaction::ConsistencyError)
end
```

When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`:

```ruby
cli.with(key: '{myslot}') do |conn|
# You can use any key starting with {myslot} in this block
end
```

Once you have pinned a client to a particular slot, you can use the same transaction APIs as the
[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows.

```ruby
# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two.
cli.call('SET', 'key', 0)
cli.with(key: 'key') do |conn|
conn.multi do |txn|
txn.call('INCR', 'key')
txn.call('INCR', 'key')
end
#=> ['OK', 'OK']
end

# Conditional execution with WATCH can be used to e.g. atomically swap two keys
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
cli.with(key: '{key}') do |conn|
conn.call('WATCH', '{key}1', '{key}2')
conn.multi do |txn|
old_key1 = conn.call('GET', '{key}1')
old_key2 = conn.call('GET', '{key}2')
txn.call('SET', '{key}1', old_key2)
txn.call('SET', '{key}2', old_key1)
end
# This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified
# either of the values
end

# You can also pass watch: to #multi as a shortcut
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
cli.with(key: '{key}') do |conn|
conn.multi(watch: ['{key}1', '{key}2']) do |txn|
old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2')
txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1)
end
end
```

Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because
you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot
it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero
`retry_count` to `#with`.

```ruby
cli.with(key: '{key}', retry_count: 1) do |conn|
conn.call('GET', '{key}1')
#=> "value1"

# Now, some changes in cluster topology mean that {key} is moved to a different node!

conn.call('GET', '{key}2')
#=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError)

# Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered
# correct node.
end
```

Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
call will pin the connection to a slot when using clustering, or be a no-op when not.

## ACL
The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly.
So please permit it like the followings.
Expand Down
26 changes: 23 additions & 3 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# frozen_string_literal: true

require 'redis_client/cluster/concurrent_worker'
require 'redis_client/cluster/pinning'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/pub_sub'
require 'redis_client/cluster/router'
require 'redis_client/cluster/transaction'

class RedisClient
class Cluster
Expand Down Expand Up @@ -89,14 +89,34 @@ def pipelined
pipeline.execute
end

def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
def multi(watch: nil, key: nil, &block)
slot_key = key
slot_key = watch&.first if slot_key.nil?
raise ArgumentError, 'watch or key must be provided' if slot_key.nil?

with(key: slot_key) do |conn|
conn.multi(watch: watch, &block)
end
end

def pubsub
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
end

def with(key:, write: true, retry_count: 0)
raise ArgumentError, 'key must be provided' if key.nil? || key.empty?

slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
node_key = @router.find_node_key_by_key(key, primary: write)
node = @router.find_node(node_key)
# Calling #with checks out the underlying connection if this is a pooled connection
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
# transaction if so.
@router.try_delegate(node, :with, retry_count: retry_count) do |conn|
yield ::RedisClient::Cluster::Pinning::ConnectionProxy.new(conn, slot, @router, @command_builder)
end
end

def close
@concurrent_worker.close
@router.close
Expand Down
70 changes: 53 additions & 17 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/key_slot_converter'
require 'redis_client/cluster/normalized_cmd_name'

class RedisClient
class Cluster
class Command
EMPTY_STRING = ''
LEFT_BRACKET = '{'
RIGHT_BRACKET = '}'
EMPTY_HASH = {}.freeze
EMPTY_ARRAY = [].freeze

Detail = Struct.new(
'RedisCommand',
:first_key_position,
:last_key_position,
:key_step,
:write?,
:readonly?,
keyword_init: true
Expand Down Expand Up @@ -50,6 +52,8 @@ def parse_command_reply(rows)

acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new(
first_key_position: row[3],
last_key_position: row[4],
key_step: row[5],
write?: row[2].include?('write'),
readonly?: row[2].include?('readonly')
)
Expand All @@ -65,9 +69,18 @@ def extract_first_key(command)
i = determine_first_key_position(command)
return EMPTY_STRING if i == 0

key = (command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s
hash_tag = extract_hash_tag(key)
hash_tag.empty? ? key : hash_tag
(command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s
end

def extract_all_keys(command)
keys_start = determine_first_key_position(command)
keys_end = determine_last_key_position(command, keys_start)
keys_step = determine_key_step(command)
return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?)

keys_end = [keys_end, command.size - 1].min
# use .. inclusive range because keys_end is a valid index.
(keys_start..keys_end).step(keys_step).map { |i| command[i] }
end

def should_send_to_primary?(command)
Expand Down Expand Up @@ -101,21 +114,44 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo
end
end

def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
idx.nil? ? 0 : idx + 1
# IMPORTANT: this determines the last key position INCLUSIVE of the last key -
# i.e. command[determine_last_key_position(command)] is a key.
# This is in line with what Redis returns from COMMANDS.
def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize
case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
when 'eval', 'evalsha', 'zinterstore', 'zunionstore'
# EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]]
# ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
command[2].to_i + 2
when 'object', 'memory'
# OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key
# MEMORY USAGE key [SAMPLES count]
keys_start
when 'migrate'
# MIGRATE host port <key | ""> destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]]
command[3].empty? ? (command.length - 1) : 3
when 'xread', 'xreadgroup'
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
keys_start + ((command.length - keys_start) / 2) - 1
else
# If there is a fixed, non-variable number of keys, don't iterate past that.
if @commands[name].last_key_position >= 0
@commands[name].last_key_position
else
command.length + @commands[name].last_key_position
end
end
end

# @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags
def extract_hash_tag(key)
key = key.to_s
s = key.index(LEFT_BRACKET)
return EMPTY_STRING if s.nil?

e = key.index(RIGHT_BRACKET, s + 1)
return EMPTY_STRING if e.nil?
def determine_key_step(command)
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
# Some commands like EVALSHA have zero as the step in COMMANDS somehow.
@commands[name].key_step == 0 ? 1 : @commands[name].key_step
end

key[s + 1..e - 1]
def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
idx.nil? ? 0 : idx + 1
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,11 @@ def initialize(_ = '')
)
end
end

class BlockingReadTimeoutError < ::RedisClient::ReadTimeoutError; end

module Transaction
ConsistencyError = Class.new(::RedisClient::Error)
end
end
end
18 changes: 18 additions & 0 deletions lib/redis_client/cluster/key_slot_converter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
class RedisClient
class Cluster
module KeySlotConverter
EMPTY_STRING = ''
LEFT_BRACKET = '{'
RIGHT_BRACKET = '}'
XMODEM_CRC16_LOOKUP = [
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
Expand Down Expand Up @@ -45,13 +48,28 @@ module KeySlotConverter
def convert(key)
return nil if key.nil?

hash_tag = extract_hash_tag(key)
key = hash_tag unless hash_tag.empty?

crc = 0
key.each_byte do |b|
crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff]
end

crc % HASH_SLOTS
end

# @see https://redis.io/topics/cluster-spec#keys-hash-tags Keys hash tags
def extract_hash_tag(key)
key = key.to_s
s = key.index(LEFT_BRACKET)
return EMPTY_STRING if s.nil?

e = key.index(RIGHT_BRACKET, s + 1)
return EMPTY_STRING if e.nil?

key[s + 1..e - 1]
end
end
end
end
18 changes: 17 additions & 1 deletion lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def []=(index, element)
class Config < ::RedisClient::Config
def initialize(scale_read: false, **kwargs)
@scale_read = scale_read
super(**kwargs)
super(**kwargs, client_implementation: Client)
end

private
Expand All @@ -92,6 +92,22 @@ def build_connection_prelude
end
end

class Client < ::RedisClient
# We need to be able to differentiate between timeout errors caused by blocking read timeouts
# (which should NOT cause a cluster topology update) with normal read timeouts (which should)
def blocking_call(timeout, *command, **kwargs)
super
rescue ::RedisClient::TimeoutError => e
raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message
end

def blocking_call_v(timeout, command)
super
rescue ::RedisClient::TimeoutError => e
raise ::RedisClient::Cluster::BlockingReadTimeoutError, e.message
end
end

class << self
def load_info(options, concurrent_worker, slow_command_timeout: -1, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
raise ::RedisClient::Cluster::InitialSetupError, [] if options.nil? || options.empty?
Expand Down
Loading